Skip to main content

cognee_delete/
lib.rs

1//! Cascading deletion of data and datasets across all Cognee backends.
2//!
3//! Removes content in dependency order — relational DB → graph DB → vector DB →
4//! file storage — so no orphaned references remain. Supports dry-run previews.
5//!
6//! Main types: [`DeleteService`] and [`AuthorizedDeleteService`] (the
7//! permission-checked wrapper).
8
9mod authorized;
10
11pub use authorized::AuthorizedDeleteService;
12
13use std::collections::{HashMap, HashSet};
14use std::sync::Arc;
15
16use cognee_core::pipeline_run_registry::{
17    ids::{pipeline_id, pipeline_run_id},
18    run_info_for_initiated,
19};
20use cognee_database::{DeleteDb, GraphEdge, GraphNode, PipelineRunRepository, PipelineRunStatus};
21use cognee_graph::GraphDBTrait;
22use cognee_models::{Dataset, EdgeType, Triplet};
23use cognee_session::SessionStore;
24use cognee_storage::{StorageError, StorageTrait};
25use cognee_vector::VectorDB;
26use serde::{Deserialize, Serialize};
27use thiserror::Error;
28use tracing::{info, warn};
29use uuid::Uuid;
30
31/// Map a `DeleteScope` variant to a human-readable label matching Python's
32/// `COGNEE_FORGET_TARGET` values.
33fn scope_label(scope: &DeleteScope) -> &'static str {
34    match scope {
35        DeleteScope::Data { .. } => "data_item",
36        DeleteScope::Dataset { .. } => "dataset",
37        DeleteScope::User { .. } => "user",
38        DeleteScope::All => "everything",
39    }
40}
41
42/// Map a `DeleteMode` variant to a human-readable label.
43fn mode_label(mode: &DeleteMode) -> &'static str {
44    match mode {
45        DeleteMode::Soft => "soft",
46        DeleteMode::Hard => "hard",
47    }
48}
49
50/// Fallback vector collections used when `list_collections()` returns an empty
51/// list (e.g. backends that do not implement dynamic discovery).
52const FALLBACK_VECTOR_COLLECTIONS: &[(&str, &str)] = &[
53    ("DocumentChunk", "text"),
54    ("Entity", "name"),
55    ("EntityType", "name"),
56    ("TextSummary", "text"),
57    ("EdgeType", "relationship_name"),
58    ("Triplet", "text"),
59    ("Event", "name"),
60];
61
62#[derive(Debug, Clone, Serialize, Deserialize)]
63pub enum DeleteScope {
64    Data {
65        owner_id: Uuid,
66        data_id: Uuid,
67        dataset_name: Option<String>,
68        /// When `true`, automatically delete the owning dataset if it becomes
69        /// empty after this data item is removed. Mirrors Python's
70        /// `delete_dataset_if_empty` parameter. Defaults to `false`.
71        #[serde(default)]
72        delete_dataset_if_empty: bool,
73    },
74    Dataset {
75        owner_id: Uuid,
76        dataset_name: String,
77    },
78    User {
79        owner_id: Uuid,
80    },
81    All,
82}
83
84#[derive(Debug, Clone, Serialize, Deserialize)]
85pub enum DeleteMode {
86    Soft,
87    Hard,
88}
89
90#[derive(Debug, Clone, Serialize, Deserialize)]
91pub struct DeleteRequest {
92    pub scope: DeleteScope,
93    pub mode: DeleteMode,
94    /// When true: delete graph + vector only; preserve relational rows and raw
95    /// files; force a cognify-only pipeline-status reset. Mirrors Python's
96    /// `*_memory_only` forget targets.
97    #[serde(default)]
98    pub memory_only: bool,
99}
100
101#[derive(Debug, Clone, Serialize, Deserialize, Default)]
102pub struct DeletePreview {
103    pub datasets_to_delete: usize,
104    pub dataset_links_to_delete: usize,
105    pub data_to_delete: usize,
106    pub storage_files_to_delete: usize,
107    pub graph_nodes_to_delete: usize,
108    pub vector_points_to_delete: usize,
109    pub provenance_nodes_to_delete: usize,
110    pub provenance_edges_to_delete: usize,
111    pub search_queries_to_delete: usize,
112    pub orphaned_edge_types_to_delete: usize,
113}
114
115#[derive(Debug, Clone, Serialize, Deserialize, Default)]
116pub struct DeleteResult {
117    pub deleted_datasets: usize,
118    pub deleted_dataset_links: usize,
119    pub deleted_data: usize,
120    pub deleted_storage_files: usize,
121    pub deleted_graph_nodes: usize,
122    pub deleted_vector_points: usize,
123    pub deleted_provenance_nodes: usize,
124    pub deleted_provenance_edges: usize,
125    pub deleted_orphan_entities: usize,
126    pub deleted_orphan_entity_types: usize,
127    pub deleted_orphan_edge_types: usize,
128    pub deleted_pipeline_runs: usize,
129    pub cleared_pipeline_statuses: usize,
130    pub deleted_search_queries: usize,
131    pub pruned_sessions: bool,
132    pub warnings: Vec<String>,
133}
134
135#[derive(Debug, Error)]
136pub enum DeleteError {
137    #[error("{0}")]
138    Validation(String),
139
140    #[error("{0}")]
141    Runtime(String),
142
143    #[error("Graph cleanup failed: {0}")]
144    GraphCleanup(String),
145
146    #[error("Vector cleanup failed: {0}")]
147    VectorCleanup(String),
148
149    #[error("Permission denied: {0}")]
150    PermissionDenied(String),
151}
152
153struct ResolvedDeleteTargets {
154    datasets_to_delete: Vec<Dataset>,
155    links_to_detach: Vec<(Uuid, Uuid)>,
156    candidate_data_ids: Vec<Uuid>,
157}
158
159pub struct DeleteService {
160    storage: Arc<dyn StorageTrait>,
161    database: Arc<dyn DeleteDb>,
162    graph_db: Option<Arc<dyn GraphDBTrait>>,
163    vector_db: Option<Arc<dyn VectorDB>>,
164    session_store: Option<Arc<dyn SessionStore>>,
165    pipeline_run_repo: Option<Arc<dyn PipelineRunRepository>>,
166}
167
168impl DeleteService {
169    pub fn new(storage: Arc<dyn StorageTrait>, database: Arc<dyn DeleteDb>) -> Self {
170        Self {
171            storage,
172            database,
173            graph_db: None,
174            vector_db: None,
175            session_store: None,
176            pipeline_run_repo: None,
177        }
178    }
179
180    pub fn with_graph_db(mut self, graph_db: Arc<dyn GraphDBTrait>) -> Self {
181        self.graph_db = Some(graph_db);
182        self
183    }
184
185    pub fn with_vector_db(mut self, vector_db: Arc<dyn VectorDB>) -> Self {
186        self.vector_db = Some(vector_db);
187        self
188    }
189
190    pub fn with_session_store(mut self, store: Arc<dyn SessionStore>) -> Self {
191        self.session_store = Some(store);
192        self
193    }
194
195    /// Wire a [`PipelineRunRepository`] so the dataset-scoped delete path
196    /// writes a fresh `INITIATED` row for every `(dataset_id, pipeline_name)`
197    /// pair before tearing the dataset down. This matches Python's prune
198    /// chain, which fires `reset_dataset_pipeline_run_status` so a future
199    /// re-cognify is not short-circuited by
200    /// `check_pipeline_run_qualification` (task 08-08).
201    ///
202    /// When unset (default for back-compat / mock paths) the reset is a
203    /// no-op.
204    pub fn with_pipeline_run_repo(mut self, repo: Arc<dyn PipelineRunRepository>) -> Self {
205        self.pipeline_run_repo = Some(repo);
206        self
207    }
208
209    #[tracing::instrument(
210        name = "cognee.delete.preview",
211        skip(self, request),
212        fields(
213            cognee.forget.target = %scope_label(&request.scope),
214            cognee.result.count = tracing::field::Empty,
215        )
216    )]
217    pub async fn preview(&self, request: &DeleteRequest) -> Result<DeletePreview, DeleteError> {
218        let targets = self.resolve_targets(request).await?;
219        let data_to_delete = self
220            .count_data_that_would_be_deleted(&targets.candidate_data_ids, &targets.links_to_detach)
221            .await?;
222
223        // Count graph nodes, vector points, and provenance rows from tables
224        let (graph_node_count, vector_point_count, prov_node_count, prov_edge_count) =
225            self.count_graph_vector_artifacts(&targets).await?;
226
227        // Count search history queries that would be deleted
228        let search_queries_to_delete = match &request.scope {
229            DeleteScope::User { owner_id } => self
230                .database
231                .count_search_history_for_user(*owner_id)
232                .await
233                .map_err(|e| {
234                    DeleteError::Runtime(format!(
235                        "Failed to count search history for user {owner_id}: {e}"
236                    ))
237                })? as usize,
238            DeleteScope::All => self
239                .database
240                .count_all_search_history()
241                .await
242                .map_err(|e| {
243                    DeleteError::Runtime(format!("Failed to count all search history: {e}"))
244                })? as usize,
245            DeleteScope::Data { .. } | DeleteScope::Dataset { .. } => 0,
246        };
247
248        tracing::Span::current().record("cognee.result.count", data_to_delete);
249
250        info!(
251            datasets = targets.datasets_to_delete.len(),
252            links = targets.links_to_detach.len(),
253            data = data_to_delete,
254            graph_nodes = graph_node_count,
255            vector_points = vector_point_count,
256            "delete preview computed"
257        );
258
259        Ok(DeletePreview {
260            datasets_to_delete: targets.datasets_to_delete.len(),
261            dataset_links_to_delete: targets.links_to_detach.len(),
262            data_to_delete,
263            storage_files_to_delete: data_to_delete,
264            graph_nodes_to_delete: graph_node_count,
265            vector_points_to_delete: vector_point_count,
266            provenance_nodes_to_delete: prov_node_count,
267            provenance_edges_to_delete: prov_edge_count,
268            search_queries_to_delete,
269            // Orphaned EdgeType count is only known at execution time (after
270            // graph nodes are deleted and edges disappear), so preview reports 0.
271            orphaned_edge_types_to_delete: 0,
272        })
273    }
274
275    #[tracing::instrument(
276        name = "cognee.delete.execute",
277        skip(self, request),
278        fields(
279            cognee.forget.target = %scope_label(&request.scope),
280            cognee.operation.mode = %mode_label(&request.mode),
281            cognee.result.count = tracing::field::Empty,
282        )
283    )]
284    pub async fn execute(&self, request: &DeleteRequest) -> Result<DeleteResult, DeleteError> {
285        let targets = self.resolve_targets(request).await?;
286
287        info!(
288            datasets = targets.datasets_to_delete.len(),
289            links = targets.links_to_detach.len(),
290            data_candidates = targets.candidate_data_ids.len(),
291            "delete targets resolved"
292        );
293
294        let mut warnings = Vec::new();
295        let mut deleted_links = 0usize;
296        let mut deleted_datasets = 0usize;
297        let mut deleted_data = 0usize;
298        let mut deleted_storage = 0usize;
299        let mut deleted_graph_nodes = 0usize;
300        let mut deleted_vector_points = 0usize;
301        let mut deleted_provenance_nodes = 0usize;
302        let mut deleted_provenance_edges = 0usize;
303        let mut deleted_pipeline_runs = 0usize;
304        let mut cleared_pipeline_statuses = 0usize;
305
306        // ------------------------------------------------------------------
307        // Memory-only path: wipe graph+vector, reset only cognify pipeline,
308        // preserve relational rows and files.
309        // ------------------------------------------------------------------
310        if request.memory_only {
311            return self.execute_memory_only(request, &targets).await;
312        }
313
314        // ------------------------------------------------------------------
315        // Phase 0: Pipeline status cleanup (while junction rows still exist)
316        // ------------------------------------------------------------------
317        // Clear pipeline_status JSON entries for datasets about to be deleted.
318        // This must run before junction rows (dataset_data) are removed, since
319        // the junction is needed to find related Data records.
320
321        for dataset in &targets.datasets_to_delete {
322            let count = self
323                .database
324                .clear_pipeline_status_for_dataset(dataset.id)
325                .await
326                .map_err(|error| {
327                    DeleteError::Runtime(format!(
328                        "Failed to clear pipeline_status for dataset '{}': {error}",
329                        dataset.name
330                    ))
331                })?;
332            cleared_pipeline_statuses += count;
333
334            // Python parity: writing a fresh `INITIATED` row for every
335            // pipeline registered against this dataset invalidates any prior
336            // `COMPLETED` row so a future re-cognify is not short-circuited
337            // by `check_pipeline_run_qualification` (task 08-08). The rows
338            // outlive the dataset itself — once the FK is dropped (gap 08-01)
339            // the orphans are harmless and surface in
340            // `list_recent_with_attribution` with `dataset_name = None`.
341            // See [docs/telemetry/08/05-reset-helpers.md §4.3] for the
342            // orphan-row decision.
343            self.reset_dataset_pipeline_run_status(dataset.owner_id, dataset.id)
344                .await?;
345        }
346
347        // For data-scoped deletion, clear pipeline_status for each affected
348        // dataset (the data item's pipeline_status entries keyed by dataset_id).
349        if matches!(request.scope, DeleteScope::Data { .. }) {
350            // Collect unique dataset IDs from links_to_detach (that are NOT in
351            // datasets_to_delete, which were already handled above).
352            let already_handled: HashSet<Uuid> =
353                targets.datasets_to_delete.iter().map(|d| d.id).collect();
354            let mut affected_dataset_ids: HashSet<Uuid> = HashSet::new();
355            for (dataset_id, _) in &targets.links_to_detach {
356                if !already_handled.contains(dataset_id) {
357                    affected_dataset_ids.insert(*dataset_id);
358                }
359            }
360            for dataset_id in affected_dataset_ids {
361                let count = self
362                    .database
363                    .clear_pipeline_status_for_dataset(dataset_id)
364                    .await
365                    .map_err(|error| {
366                        DeleteError::Runtime(format!(
367                            "Failed to clear pipeline_status for dataset {dataset_id}: {error}"
368                        ))
369                    })?;
370                cleared_pipeline_statuses += count;
371            }
372        }
373
374        // ------------------------------------------------------------------
375        // Phase 1: Graph/vector cleanup (before relational provenance is gone)
376        // ------------------------------------------------------------------
377
378        let is_all_scope = matches!(request.scope, DeleteScope::All);
379
380        if is_all_scope {
381            // Fast-path: wipe entire graph and all vector collections
382            let (gn, vp, pn, pe) = self.count_graph_vector_artifacts(&targets).await?;
383            let (_, _, _, _, gv_warnings) = self.cleanup_all().await?;
384            deleted_graph_nodes += gn;
385            deleted_vector_points += vp;
386            deleted_provenance_nodes += pn;
387            deleted_provenance_edges += pe;
388            warnings.extend(gv_warnings);
389        } else {
390            // Dataset-scoped cleanup
391            for dataset in &targets.datasets_to_delete {
392                let (gn, vp, pn, pe, gv_warnings) = self.cleanup_dataset(dataset.id).await?;
393                deleted_graph_nodes += gn;
394                deleted_vector_points += vp;
395                deleted_provenance_nodes += pn;
396                deleted_provenance_edges += pe;
397                warnings.extend(gv_warnings);
398            }
399
400            // Data-scoped cleanup (for single data item deletion)
401            if matches!(request.scope, DeleteScope::Data { .. }) {
402                // Compute which data IDs will actually be deleted
403                let deletable_data_ids = self.compute_deletable_data_ids(&targets).await?;
404
405                for data_id in &deletable_data_ids {
406                    for (dataset_id, did) in &targets.links_to_detach {
407                        if did == data_id {
408                            let (gn, vp, pn, pe, gv_warnings) =
409                                self.cleanup_data(*data_id, *dataset_id).await?;
410                            deleted_graph_nodes += gn;
411                            deleted_vector_points += vp;
412                            deleted_provenance_nodes += pn;
413                            deleted_provenance_edges += pe;
414                            warnings.extend(gv_warnings);
415                        }
416                    }
417                }
418            }
419        }
420
421        info!(
422            deleted_graph_nodes,
423            deleted_vector_points,
424            deleted_provenance_nodes,
425            deleted_provenance_edges,
426            "phase 1: graph/vector cleanup completed"
427        );
428
429        // ------------------------------------------------------------------
430        // Phase 2: Relational cleanup (links, datasets, data, storage)
431        // ------------------------------------------------------------------
432
433        for (dataset_id, data_id) in &targets.links_to_detach {
434            self.database
435                .detach_data_from_dataset(*dataset_id, *data_id)
436                .await
437                .map_err(|error| {
438                    DeleteError::Runtime(format!(
439                        "Failed to detach data {data_id} from dataset {dataset_id}: {error}"
440                    ))
441                })?;
442            deleted_links += 1;
443        }
444
445        // For data-scoped deletion, invalidate the pipeline cache for each
446        // affected dataset after detaching. This ensures re-running cognify
447        // will reprocess the dataset since its data composition has changed.
448        if matches!(request.scope, DeleteScope::Data { .. }) {
449            let mut invalidated_datasets: HashSet<Uuid> = HashSet::new();
450            for (dataset_id, _) in &targets.links_to_detach {
451                if invalidated_datasets.insert(*dataset_id) {
452                    let count = self
453                        .database
454                        .delete_pipeline_runs_by_dataset(*dataset_id)
455                        .await
456                        .map_err(|error| {
457                            DeleteError::Runtime(format!(
458                                "Failed to delete pipeline_runs for dataset {dataset_id}: {error}"
459                            ))
460                        })?;
461                    deleted_pipeline_runs += count as usize;
462                }
463            }
464        }
465
466        for dataset in &targets.datasets_to_delete {
467            self.database
468                .delete_dataset(dataset.id)
469                .await
470                .map_err(|error| {
471                    DeleteError::Runtime(format!(
472                        "Failed to delete dataset '{}': {error}",
473                        dataset.name
474                    ))
475                })?;
476            deleted_datasets += 1;
477        }
478
479        for data_id in &targets.candidate_data_ids {
480            let remaining_links = self
481                .database
482                .count_data_dataset_links(*data_id)
483                .await
484                .map_err(|error| {
485                    DeleteError::Runtime(format!(
486                        "Failed to count links for data {data_id}: {error}"
487                    ))
488                })?;
489
490            if remaining_links > 0 {
491                continue;
492            }
493
494            let data = self.database.get_data(*data_id).await.map_err(|error| {
495                DeleteError::Runtime(format!("Failed to fetch data {data_id}: {error}"))
496            })?;
497
498            if let Some(data) = data {
499                match self.storage.delete(&data.raw_data_location).await {
500                    Ok(()) => {
501                        deleted_storage += 1;
502                    }
503                    Err(StorageError::NotFound(_)) => {
504                        warn!(
505                            data_id = %data.id,
506                            location = %data.raw_data_location,
507                            "storage file already missing"
508                        );
509                        warnings.push(format!(
510                            "Storage file already missing for data {} at '{}'",
511                            data.id, data.raw_data_location
512                        ));
513                    }
514                    Err(error) => {
515                        return Err(DeleteError::Runtime(format!(
516                            "Failed to delete storage for data {}: {}",
517                            data.id, error
518                        )));
519                    }
520                }
521            }
522
523            self.database.delete_data(*data_id).await.map_err(|error| {
524                DeleteError::Runtime(format!("Failed to delete data {data_id}: {error}"))
525            })?;
526            deleted_data += 1;
527        }
528
529        info!(
530            deleted_links,
531            deleted_datasets,
532            deleted_data,
533            deleted_storage,
534            "phase 2: relational cleanup completed"
535        );
536
537        // ------------------------------------------------------------------
538        // Phase 3: Hard-mode orphan sweep (degree-one Entity/EntityType nodes)
539        // ------------------------------------------------------------------
540        //
541        // The degree-based sweep is gated to Hard mode to match Python, whose
542        // degree-one Entity/EntityType sweep is itself `if mode == "hard"`
543        // (legacy_delete.py) and is never reached by the production soft path
544        // (datasets.delete_data → legacy_delete(data, "soft")). Running it on
545        // Soft as well would make Rust soft-delete *more* destructive than
546        // Python (it would remove degree-one nodes Python preserves).
547        //
548        // TODO(B6.4): Python's main soft path still prunes nodes that become
549        // orphaned by the deletion via a provenance/slug-scoped traversal
550        // (delete_from_graph_and_vector excludes co-owned slugs), not a global
551        // degree sweep. Rust currently leaves those orphans on soft delete.
552        // Closing this gap requires a deletion-scoped cleanup rather than the
553        // global degree heuristic; tracked for a follow-up.
554
555        let mut deleted_orphan_entities = 0usize;
556        let mut deleted_orphan_entity_types = 0usize;
557        let mut deleted_orphan_edge_types = 0usize;
558
559        if matches!(request.mode, DeleteMode::Hard) {
560            let (oe, oet, sweep_warnings) = self.sweep_orphan_nodes().await?;
561            deleted_orphan_entities = oe;
562            deleted_orphan_entity_types = oet;
563            warnings.extend(sweep_warnings);
564
565            let (oedge, edge_sweep_warnings) = self.sweep_orphan_edge_types().await?;
566            deleted_orphan_edge_types = oedge;
567            warnings.extend(edge_sweep_warnings);
568        }
569
570        // ------------------------------------------------------------------
571        // Phase 4: Search history cleanup
572        // ------------------------------------------------------------------
573
574        let deleted_search_queries = match &request.scope {
575            DeleteScope::User { owner_id } => self
576                .database
577                .delete_search_history_for_user(*owner_id)
578                .await
579                .map_err(|e| {
580                    DeleteError::Runtime(format!(
581                        "Failed to delete search history for user {owner_id}: {e}"
582                    ))
583                })? as usize,
584            DeleteScope::All => self
585                .database
586                .delete_all_search_history()
587                .await
588                .map_err(|e| {
589                    DeleteError::Runtime(format!("Failed to delete all search history: {e}"))
590                })? as usize,
591            // Data/Dataset scopes: no-op (no dataset_id on queries table)
592            DeleteScope::Data { .. } | DeleteScope::Dataset { .. } => 0,
593        };
594
595        // ------------------------------------------------------------------
596        // Phase 5: Session cache cleanup
597        // ------------------------------------------------------------------
598
599        let mut pruned_sessions = false;
600        if matches!(request.scope, DeleteScope::All)
601            && let Some(session_store) = &self.session_store
602        {
603            session_store
604                .prune()
605                .await
606                .map_err(|e| DeleteError::Runtime(format!("Failed to prune session cache: {e}")))?;
607            pruned_sessions = true;
608        }
609
610        let total_deleted =
611            deleted_datasets + deleted_data + deleted_graph_nodes + deleted_vector_points;
612        tracing::Span::current().record("cognee.result.count", total_deleted);
613
614        info!(
615            deleted_datasets,
616            deleted_links,
617            deleted_data,
618            deleted_storage,
619            deleted_graph_nodes,
620            deleted_vector_points,
621            deleted_orphan_entities,
622            deleted_orphan_entity_types,
623            deleted_orphan_edge_types,
624            warning_count = warnings.len(),
625            "delete execution completed"
626        );
627
628        Ok(DeleteResult {
629            deleted_datasets,
630            deleted_dataset_links: deleted_links,
631            deleted_data,
632            deleted_storage_files: deleted_storage,
633            deleted_graph_nodes,
634            deleted_vector_points,
635            deleted_provenance_nodes,
636            deleted_provenance_edges,
637            deleted_orphan_entities,
638            deleted_orphan_entity_types,
639            deleted_orphan_edge_types,
640            deleted_pipeline_runs,
641            cleared_pipeline_statuses,
642            deleted_search_queries,
643            pruned_sessions,
644            warnings,
645        })
646    }
647
648    async fn execute_memory_only(
649        &self,
650        request: &DeleteRequest,
651        targets: &ResolvedDeleteTargets,
652    ) -> Result<DeleteResult, DeleteError> {
653        let mut warnings = Vec::new();
654        let mut deleted_graph_nodes = 0usize;
655        let mut deleted_vector_points = 0usize;
656        let mut deleted_provenance_nodes = 0usize;
657        let mut deleted_provenance_edges = 0usize;
658
659        // Phase 1: Graph/vector cleanup only (same logic as normal execute).
660        let is_all_scope = matches!(request.scope, DeleteScope::All);
661
662        if is_all_scope {
663            let (gn, vp, pn, pe) = self.count_graph_vector_artifacts(targets).await?;
664            let (_, _, _, _, gv_warnings) = self.cleanup_all().await?;
665            deleted_graph_nodes += gn;
666            deleted_vector_points += vp;
667            deleted_provenance_nodes += pn;
668            deleted_provenance_edges += pe;
669            warnings.extend(gv_warnings);
670        } else {
671            for dataset in &targets.datasets_to_delete {
672                let (gn, vp, pn, pe, gv_warnings) = self.cleanup_dataset(dataset.id).await?;
673                deleted_graph_nodes += gn;
674                deleted_vector_points += vp;
675                deleted_provenance_nodes += pn;
676                deleted_provenance_edges += pe;
677                warnings.extend(gv_warnings);
678            }
679
680            if matches!(request.scope, DeleteScope::Data { .. }) {
681                let deletable_data_ids = self.compute_deletable_data_ids(targets).await?;
682                for data_id in &deletable_data_ids {
683                    for (dataset_id, did) in &targets.links_to_detach {
684                        if did == data_id {
685                            let (gn, vp, pn, pe, gv_warnings) =
686                                self.cleanup_data(*data_id, *dataset_id).await?;
687                            deleted_graph_nodes += gn;
688                            deleted_vector_points += vp;
689                            deleted_provenance_nodes += pn;
690                            deleted_provenance_edges += pe;
691                            warnings.extend(gv_warnings);
692                        }
693                    }
694                }
695            }
696        }
697
698        // Phase 2: Pipeline-status reset.
699        //
700        // The dataset and data-item variants differ exactly as Python does:
701        //
702        // * Dataset variant (Python `_forget_dataset_memory`, forget.py:271-289):
703        //   on every Data record linked to the dataset, remove the dataset_id
704        //   entry from EVERY pipeline in `pipeline_status` (Python loops over
705        //   `list(pipeline_status.keys())`), then reset the dataset-level
706        //   *run* status for `cognify_pipeline` only.
707        //
708        // * Data-item variant (Python `_forget_data_memory`, forget.py:331-351):
709        //   remove only the `cognify_pipeline` entry for `(data_id, dataset_id)`
710        //   on that single Data record; NO dataset-level run-status reset.
711        if matches!(request.scope, DeleteScope::Data { .. }) {
712            // Data-item: surgically clear cognify status for the single data
713            // record in each affected (dataset_id, data_id) pair.
714            for (dataset_id, data_id) in &targets.links_to_detach {
715                self.database
716                    .clear_cognify_pipeline_status_for_data(*data_id, *dataset_id)
717                    .await
718                    .map_err(|e| {
719                        DeleteError::Runtime(format!(
720                            "Failed to clear cognify pipeline_status for data {data_id} in dataset {dataset_id}: {e}"
721                        ))
722                    })?;
723            }
724        } else {
725            // Dataset (and All): mirror Python's per-record all-pipeline key
726            // removal, then reset cognify run status at the dataset level.
727            for dataset in &targets.datasets_to_delete {
728                self.database
729                    .clear_pipeline_status_for_dataset(dataset.id)
730                    .await
731                    .map_err(|e| {
732                        DeleteError::Runtime(format!(
733                            "Failed to clear pipeline_status for dataset '{}': {e}",
734                            dataset.name
735                        ))
736                    })?;
737
738                self.reset_cognify_pipeline_run_status(dataset.owner_id, dataset.id)
739                    .await?;
740            }
741        }
742
743        info!(
744            deleted_graph_nodes,
745            deleted_vector_points, "memory-only delete completed"
746        );
747
748        Ok(DeleteResult {
749            deleted_datasets: 0,
750            deleted_dataset_links: 0,
751            deleted_data: 0,
752            deleted_storage_files: 0,
753            deleted_graph_nodes,
754            deleted_vector_points,
755            deleted_provenance_nodes,
756            deleted_provenance_edges,
757            deleted_orphan_entities: 0,
758            deleted_orphan_entity_types: 0,
759            deleted_orphan_edge_types: 0,
760            deleted_pipeline_runs: 0,
761            cleared_pipeline_statuses: 0,
762            deleted_search_queries: 0,
763            pruned_sessions: false,
764            warnings,
765        })
766    }
767
768    pub async fn data_ids_to_delete(
769        &self,
770        request: &DeleteRequest,
771    ) -> Result<Vec<Uuid>, DeleteError> {
772        let targets = self.resolve_targets(request).await?;
773        self.compute_deletable_data_ids(&targets).await
774    }
775
776    /// Internal Python-parity helper invoked from the dataset-deletion
777    /// path. Walks every distinct `pipeline_name` registered against
778    /// `dataset_id` and writes a fresh `INITIATED` row for each (skipping
779    /// pipelines already at `INITIATED`).
780    ///
781    /// No-op when no [`PipelineRunRepository`] was supplied via
782    /// [`Self::with_pipeline_run_repo`]; this keeps mock-only test paths
783    /// working without forcing them to wire a repo.
784    async fn reset_dataset_pipeline_run_status(
785        &self,
786        owner_id: Uuid,
787        dataset_id: Uuid,
788    ) -> Result<(), DeleteError> {
789        let Some(repo) = self.pipeline_run_repo.as_ref() else {
790            return Ok(());
791        };
792
793        let runs = repo
794            .get_pipeline_runs_by_dataset(dataset_id)
795            .await
796            .map_err(|e| {
797                DeleteError::Runtime(format!(
798                    "Failed to list pipeline runs for dataset {dataset_id}: {e}"
799                ))
800            })?;
801
802        for run in runs {
803            if matches!(run.status, PipelineRunStatus::Initiated) {
804                // Python skips runs already pending to avoid duplicate rows.
805                continue;
806            }
807            let name = run.pipeline_name;
808            let pid = pipeline_id(owner_id, dataset_id, &name);
809            let prid = pipeline_run_id(pid, dataset_id);
810            repo.log_pipeline_run(
811                prid,
812                pid,
813                &name,
814                Some(dataset_id),
815                PipelineRunStatus::Initiated,
816                Some(run_info_for_initiated()),
817            )
818            .await
819            .map_err(|e| {
820                DeleteError::Runtime(format!(
821                    "Failed to reset pipeline '{name}' for dataset {dataset_id}: {e}"
822                ))
823            })?;
824        }
825        Ok(())
826    }
827
828    /// Like [`reset_dataset_pipeline_run_status`] but only resets the specified
829    /// pipeline by name. Used by memory-only forget to reset `cognify_pipeline`
830    /// without touching `add_pipeline` — matching Python's
831    /// `reset_dataset_pipeline_run_status(pipeline_names=["cognify_pipeline"])`.
832    async fn reset_cognify_pipeline_run_status(
833        &self,
834        owner_id: Uuid,
835        dataset_id: Uuid,
836    ) -> Result<(), DeleteError> {
837        let Some(repo) = self.pipeline_run_repo.as_ref() else {
838            return Ok(());
839        };
840
841        let runs = repo
842            .get_pipeline_runs_by_dataset(dataset_id)
843            .await
844            .map_err(|e| {
845                DeleteError::Runtime(format!(
846                    "Failed to list pipeline runs for dataset {dataset_id}: {e}"
847                ))
848            })?;
849
850        for run in runs {
851            if run.pipeline_name != "cognify_pipeline" {
852                continue;
853            }
854            if matches!(run.status, PipelineRunStatus::Initiated) {
855                continue;
856            }
857            let name = run.pipeline_name;
858            let pid = pipeline_id(owner_id, dataset_id, &name);
859            let prid = pipeline_run_id(pid, dataset_id);
860            repo.log_pipeline_run(
861                prid,
862                pid,
863                &name,
864                Some(dataset_id),
865                PipelineRunStatus::Initiated,
866                Some(run_info_for_initiated()),
867            )
868            .await
869            .map_err(|e| {
870                DeleteError::Runtime(format!(
871                    "Failed to reset cognify pipeline for dataset {dataset_id}: {e}"
872                ))
873            })?;
874        }
875        Ok(())
876    }
877
878    // ==================================================================
879    // Graph/vector cleanup helpers
880    // ==================================================================
881
882    /// Fast-path for `DeleteScope::All`: wipe entire graph and all vector
883    /// collections. Returns `(graph_nodes_deleted, vector_points_deleted,
884    /// provenance_nodes_deleted, provenance_edges_deleted, warnings)`.
885    ///
886    /// Note: provenance counts are 0 because the dataset cascade in phase 2
887    /// handles provenance row deletion.
888    async fn cleanup_all(&self) -> Result<(usize, usize, usize, usize, Vec<String>), DeleteError> {
889        let mut warnings = Vec::new();
890        let graph_nodes = 0usize;
891        let vector_points = 0usize;
892
893        // --- Graph ---
894        if let Some(graph_db) = &self.graph_db {
895            graph_db
896                .delete_graph()
897                .await
898                .map_err(|e| DeleteError::GraphCleanup(format!("Failed to delete graph: {e}")))?;
899            // We don't know exact count after delete_graph(); report 0 as "wiped".
900        } else {
901            warnings
902                .push("Graph DB not configured; graph artifacts were not cleaned up.".to_string());
903        }
904
905        // --- Vector ---
906        if let Some(vector_db) = &self.vector_db {
907            let mut collections = vector_db.list_collections().await.map_err(|e| {
908                DeleteError::VectorCleanup(format!("Failed to list vector collections: {e}"))
909            })?;
910
911            if collections.is_empty() {
912                // Fallback to known list
913                collections = FALLBACK_VECTOR_COLLECTIONS
914                    .iter()
915                    .map(|(dt, fn_)| (dt.to_string(), fn_.to_string()))
916                    .collect();
917            }
918
919            for (data_type, field_name) in &collections {
920                let exists = vector_db
921                    .has_collection(data_type, field_name)
922                    .await
923                    .map_err(|e| {
924                        DeleteError::VectorCleanup(format!(
925                            "Failed to check vector collection {data_type}_{field_name}: {e}"
926                        ))
927                    })?;
928
929                if exists {
930                    vector_db
931                        .delete_collection(data_type, field_name)
932                        .await
933                        .map_err(|e| {
934                            DeleteError::VectorCleanup(format!(
935                                "Failed to delete vector collection {data_type}_{field_name}: {e}"
936                            ))
937                        })?;
938                }
939            }
940        } else {
941            warnings.push(
942                "Vector DB not configured; vector artifacts were not cleaned up.".to_string(),
943            );
944        }
945
946        Ok((graph_nodes, vector_points, 0, 0, warnings))
947    }
948
949    /// Dataset-scoped cleanup: remove graph nodes and vector points based on
950    /// the provenance `nodes`/`edges` tables. Returns `(graph_nodes_deleted,
951    /// vector_points_deleted, provenance_nodes_deleted, provenance_edges_deleted,
952    /// warnings)`.
953    async fn cleanup_dataset(
954        &self,
955        dataset_id: Uuid,
956    ) -> Result<(usize, usize, usize, usize, Vec<String>), DeleteError> {
957        let mut warnings = Vec::new();
958        let mut graph_node_count = 0usize;
959        let mut vector_point_count = 0usize;
960
961        let nodes = self
962            .database
963            .get_nodes_by_dataset(dataset_id)
964            .await
965            .map_err(|e| {
966                DeleteError::Runtime(format!(
967                    "Failed to get provenance nodes for dataset {dataset_id}: {e}"
968                ))
969            })?;
970
971        let edges = self
972            .database
973            .get_edges_by_dataset(dataset_id)
974            .await
975            .map_err(|e| {
976                DeleteError::Runtime(format!(
977                    "Failed to get provenance edges for dataset {dataset_id}: {e}"
978                ))
979            })?;
980
981        let prov_node_count = nodes.len();
982        let prov_edge_count = edges.len();
983
984        if nodes.is_empty() && edges.is_empty() {
985            return Ok((0, 0, 0, 0, warnings));
986        }
987
988        // --- Graph cleanup ---
989        let (gn, gw) = self.delete_graph_artifacts(&nodes).await?;
990        graph_node_count += gn;
991        warnings.extend(gw);
992
993        // --- Vector cleanup ---
994        let (vp, vw) = self.delete_vector_artifacts(&nodes, &edges).await?;
995        vector_point_count += vp;
996        warnings.extend(vw);
997
998        // --- Provenance cleanup ---
999        // Note: for dataset deletion, the FK CASCADE on dataset_id will handle
1000        // this automatically when the dataset record is deleted. But we call
1001        // it explicitly to be safe and consistent.
1002        self.database
1003            .delete_provenance_edges_for_dataset(dataset_id)
1004            .await
1005            .map_err(|e| {
1006                DeleteError::Runtime(format!(
1007                    "Failed to delete provenance edges for dataset {dataset_id}: {e}"
1008                ))
1009            })?;
1010        self.database
1011            .delete_provenance_nodes_for_dataset(dataset_id)
1012            .await
1013            .map_err(|e| {
1014                DeleteError::Runtime(format!(
1015                    "Failed to delete provenance nodes for dataset {dataset_id}: {e}"
1016                ))
1017            })?;
1018
1019        Ok((
1020            graph_node_count,
1021            vector_point_count,
1022            prov_node_count,
1023            prov_edge_count,
1024            warnings,
1025        ))
1026    }
1027
1028    /// Data-scoped cleanup: remove graph nodes and vector points for a single
1029    /// data item, using only non-shared slugs. Returns `(graph_nodes_deleted,
1030    /// vector_points_deleted, provenance_nodes_deleted, provenance_edges_deleted,
1031    /// warnings)`.
1032    ///
1033    /// Note: `provenance_nodes_deleted` and `provenance_edges_deleted` count ALL
1034    /// provenance rows for this `(data_id, dataset_id)` pair, not just the
1035    /// unique ones used for graph/vector cleanup.
1036    async fn cleanup_data(
1037        &self,
1038        data_id: Uuid,
1039        dataset_id: Uuid,
1040    ) -> Result<(usize, usize, usize, usize, Vec<String>), DeleteError> {
1041        let mut warnings = Vec::new();
1042        let mut graph_node_count = 0usize;
1043        let mut vector_point_count = 0usize;
1044
1045        let nodes = self
1046            .database
1047            .get_unique_nodes_for_data(data_id, dataset_id)
1048            .await
1049            .map_err(|e| {
1050                DeleteError::Runtime(format!(
1051                    "Failed to get unique provenance nodes for data {data_id}: {e}"
1052                ))
1053            })?;
1054
1055        let edges = self
1056            .database
1057            .get_unique_edges_for_data(data_id, dataset_id)
1058            .await
1059            .map_err(|e| {
1060                DeleteError::Runtime(format!(
1061                    "Failed to get unique provenance edges for data {data_id}: {e}"
1062                ))
1063            })?;
1064
1065        // Count ALL provenance rows (not just unique) before deletion, because
1066        // we delete all rows for this (data_id, dataset_id) pair.
1067        let all_prov_nodes = self
1068            .database
1069            .get_provenance_node_count_for_data(data_id, dataset_id)
1070            .await
1071            .map_err(|e| {
1072                DeleteError::Runtime(format!(
1073                    "Failed to count provenance nodes for data {data_id}: {e}"
1074                ))
1075            })?;
1076        let all_prov_edges = self
1077            .database
1078            .get_provenance_edge_count_for_data(data_id, dataset_id)
1079            .await
1080            .map_err(|e| {
1081                DeleteError::Runtime(format!(
1082                    "Failed to count provenance edges for data {data_id}: {e}"
1083                ))
1084            })?;
1085
1086        if nodes.is_empty() && edges.is_empty() && all_prov_nodes == 0 && all_prov_edges == 0 {
1087            return Ok((0, 0, 0, 0, warnings));
1088        }
1089
1090        // --- Graph cleanup ---
1091        let (gn, gw) = self.delete_graph_artifacts(&nodes).await?;
1092        graph_node_count += gn;
1093        warnings.extend(gw);
1094
1095        // --- Vector cleanup ---
1096        let (vp, vw) = self.delete_vector_artifacts(&nodes, &edges).await?;
1097        vector_point_count += vp;
1098        warnings.extend(vw);
1099
1100        // --- Provenance cleanup ---
1101        self.database
1102            .delete_provenance_edges_for_data(data_id, dataset_id)
1103            .await
1104            .map_err(|e| {
1105                DeleteError::Runtime(format!(
1106                    "Failed to delete provenance edges for data {data_id}: {e}"
1107                ))
1108            })?;
1109        self.database
1110            .delete_provenance_nodes_for_data(data_id, dataset_id)
1111            .await
1112            .map_err(|e| {
1113                DeleteError::Runtime(format!(
1114                    "Failed to delete provenance nodes for data {data_id}: {e}"
1115                ))
1116            })?;
1117
1118        Ok((
1119            graph_node_count,
1120            vector_point_count,
1121            all_prov_nodes,
1122            all_prov_edges,
1123            warnings,
1124        ))
1125    }
1126
1127    /// Delete nodes from the graph DB based on provenance node slugs.
1128    /// Returns `(count, warnings)`.
1129    async fn delete_graph_artifacts(
1130        &self,
1131        nodes: &[GraphNode],
1132    ) -> Result<(usize, Vec<String>), DeleteError> {
1133        let mut warnings = Vec::new();
1134
1135        if let Some(graph_db) = &self.graph_db {
1136            let node_ids: Vec<String> = nodes
1137                .iter()
1138                .map(|n| n.slug.to_string())
1139                .collect::<HashSet<_>>()
1140                .into_iter()
1141                .collect();
1142
1143            if !node_ids.is_empty() {
1144                graph_db.delete_nodes(&node_ids).await.map_err(|e| {
1145                    DeleteError::GraphCleanup(format!(
1146                        "Failed to delete {} graph nodes: {e}",
1147                        node_ids.len()
1148                    ))
1149                })?;
1150            }
1151
1152            Ok((node_ids.len(), warnings))
1153        } else {
1154            if !nodes.is_empty() {
1155                warnings.push(
1156                    "Graph DB not configured; graph artifacts were not cleaned up.".to_string(),
1157                );
1158            }
1159            Ok((0, warnings))
1160        }
1161    }
1162
1163    /// Delete vector points based on provenance nodes and edges.
1164    /// Returns `(count, warnings)`.
1165    async fn delete_vector_artifacts(
1166        &self,
1167        nodes: &[GraphNode],
1168        edges: &[GraphEdge],
1169    ) -> Result<(usize, Vec<String>), DeleteError> {
1170        let mut warnings = Vec::new();
1171        let mut total_deleted = 0usize;
1172
1173        if let Some(vector_db) = &self.vector_db {
1174            // Group nodes by (node_type, field_name) for batched deletion
1175            let mut by_collection: HashMap<(String, String), Vec<Uuid>> = HashMap::new();
1176
1177            for node in nodes {
1178                let fields = parse_indexed_fields(&node.indexed_fields);
1179                for field_name in fields {
1180                    by_collection
1181                        .entry((node.node_type.clone(), field_name))
1182                        .or_default()
1183                        .push(node.slug);
1184                }
1185            }
1186
1187            // Edges contribute to EdgeType_relationship_name and Triplet_text.
1188            for edge in edges {
1189                by_collection
1190                    .entry(("EdgeType".to_string(), "relationship_name".to_string()))
1191                    .or_default()
1192                    .push(EdgeType::deterministic_id(&edge.relationship_name));
1193
1194                by_collection
1195                    .entry(("Triplet".to_string(), "text".to_string()))
1196                    .or_default()
1197                    .push(triplet_vector_id(edge));
1198            }
1199
1200            for ((data_type, field_name), ids) in &by_collection {
1201                if ids.is_empty() {
1202                    continue;
1203                }
1204
1205                let exists = vector_db
1206                    .has_collection(data_type, field_name)
1207                    .await
1208                    .map_err(|e| {
1209                        DeleteError::VectorCleanup(format!(
1210                            "Failed to check vector collection {data_type}_{field_name}: {e}"
1211                        ))
1212                    })?;
1213
1214                if exists {
1215                    vector_db
1216                        .delete_points(data_type, field_name, ids)
1217                        .await
1218                        .map_err(|e| {
1219                            DeleteError::VectorCleanup(format!(
1220                                "Failed to delete vector points from {data_type}_{field_name}: {e}"
1221                            ))
1222                        })?;
1223                    total_deleted += ids.len();
1224                }
1225            }
1226
1227            Ok((total_deleted, warnings))
1228        } else {
1229            if !nodes.is_empty() || !edges.is_empty() {
1230                warnings.push(
1231                    "Vector DB not configured; vector artifacts were not cleaned up.".to_string(),
1232                );
1233            }
1234            Ok((0, warnings))
1235        }
1236    }
1237
1238    /// Hard-mode orphan sweep: find and delete degree-one Entity/EntityType
1239    /// nodes from the graph and their corresponding vector points.
1240    ///
1241    /// Returns `(orphan_entities, orphan_entity_types, warnings)`.
1242    async fn sweep_orphan_nodes(&self) -> Result<(usize, usize, Vec<String>), DeleteError> {
1243        let mut warnings = Vec::new();
1244
1245        let graph_db = match &self.graph_db {
1246            Some(db) => db,
1247            None => {
1248                warnings
1249                    .push("Graph DB not configured; hard-mode orphan sweep skipped.".to_string());
1250                return Ok((0, 0, warnings));
1251            }
1252        };
1253
1254        let orphan_entities = graph_db.get_degree_one_nodes("Entity").await.map_err(|e| {
1255            DeleteError::GraphCleanup(format!("Failed to query degree-one Entity nodes: {e}"))
1256        })?;
1257
1258        let orphan_types = graph_db
1259            .get_degree_one_nodes("EntityType")
1260            .await
1261            .map_err(|e| {
1262                DeleteError::GraphCleanup(format!(
1263                    "Failed to query degree-one EntityType nodes: {e}"
1264                ))
1265            })?;
1266
1267        let entity_count = orphan_entities.len();
1268        let type_count = orphan_types.len();
1269
1270        if entity_count == 0 && type_count == 0 {
1271            return Ok((0, 0, warnings));
1272        }
1273
1274        // Collect all orphan node IDs for graph deletion
1275        let all_orphan_ids: Vec<String> = orphan_entities
1276            .iter()
1277            .chain(orphan_types.iter())
1278            .map(|(id, _)| id.clone())
1279            .collect();
1280
1281        // Delete from vector DB (if configured)
1282        if let Some(vector_db) = &self.vector_db {
1283            // Entity orphans → Entity/name collection
1284            if !orphan_entities.is_empty() {
1285                let entity_uuids: Vec<Uuid> = orphan_entities
1286                    .iter()
1287                    .filter_map(|(id, _)| Uuid::parse_str(id).ok())
1288                    .collect();
1289                if !entity_uuids.is_empty()
1290                    && vector_db
1291                        .has_collection("Entity", "name")
1292                        .await
1293                        .unwrap_or(false)
1294                {
1295                    vector_db
1296                        .delete_points("Entity", "name", &entity_uuids)
1297                        .await
1298                        .map_err(|e| {
1299                            DeleteError::VectorCleanup(format!(
1300                                "Failed to delete orphan Entity vector points: {e}"
1301                            ))
1302                        })?;
1303                }
1304            }
1305
1306            // EntityType orphans → EntityType/name collection
1307            if !orphan_types.is_empty() {
1308                let type_uuids: Vec<Uuid> = orphan_types
1309                    .iter()
1310                    .filter_map(|(id, _)| Uuid::parse_str(id).ok())
1311                    .collect();
1312                if !type_uuids.is_empty()
1313                    && vector_db
1314                        .has_collection("EntityType", "name")
1315                        .await
1316                        .unwrap_or(false)
1317                {
1318                    vector_db
1319                        .delete_points("EntityType", "name", &type_uuids)
1320                        .await
1321                        .map_err(|e| {
1322                            DeleteError::VectorCleanup(format!(
1323                                "Failed to delete orphan EntityType vector points: {e}"
1324                            ))
1325                        })?;
1326                }
1327            }
1328        }
1329
1330        // Delete from graph DB
1331        graph_db.delete_nodes(&all_orphan_ids).await.map_err(|e| {
1332            DeleteError::GraphCleanup(format!("Failed to delete orphan graph nodes: {e}"))
1333        })?;
1334
1335        Ok((entity_count, type_count, warnings))
1336    }
1337
1338    /// Hard-mode orphan sweep for EdgeType nodes: find EdgeType graph nodes
1339    /// whose relationship name no longer appears in any graph edge, and delete
1340    /// them from both the graph and vector DBs.
1341    ///
1342    /// Returns `(deleted_count, warnings)`.
1343    async fn sweep_orphan_edge_types(&self) -> Result<(usize, Vec<String>), DeleteError> {
1344        let mut warnings = Vec::new();
1345
1346        let graph_db = match &self.graph_db {
1347            Some(db) => db,
1348            None => {
1349                warnings
1350                    .push("Graph DB not configured; orphan EdgeType sweep skipped.".to_string());
1351                return Ok((0, warnings));
1352            }
1353        };
1354
1355        let orphan_edge_types = match graph_db.get_zero_degree_edge_type_nodes().await {
1356            Ok(nodes) => nodes,
1357            Err(e) => {
1358                warnings.push(format!(
1359                    "Failed to query orphan EdgeType nodes (non-fatal): {e}"
1360                ));
1361                return Ok((0, warnings));
1362            }
1363        };
1364
1365        let count = orphan_edge_types.len();
1366        if count == 0 {
1367            return Ok((0, warnings));
1368        }
1369
1370        // Delete from vector DB (if configured) — non-fatal
1371        if let Some(vector_db) = &self.vector_db {
1372            let uuids: Vec<Uuid> = orphan_edge_types
1373                .iter()
1374                .filter_map(|(id, _)| Uuid::parse_str(id).ok())
1375                .collect();
1376
1377            if !uuids.is_empty() {
1378                let has_collection = vector_db
1379                    .has_collection("EdgeType", "relationship_name")
1380                    .await
1381                    .unwrap_or(false);
1382                if has_collection
1383                    && let Err(e) = vector_db
1384                        .delete_points("EdgeType", "relationship_name", &uuids)
1385                        .await
1386                {
1387                    warnings.push(format!(
1388                        "Failed to delete orphan EdgeType vector points (non-fatal): {e}"
1389                    ));
1390                }
1391            }
1392        }
1393
1394        // Delete from graph DB
1395        let orphan_ids: Vec<String> = orphan_edge_types.iter().map(|(id, _)| id.clone()).collect();
1396
1397        if let Err(e) = graph_db.delete_nodes(&orphan_ids).await {
1398            warnings.push(format!(
1399                "Failed to delete orphan EdgeType graph nodes (non-fatal): {e}"
1400            ));
1401            return Ok((0, warnings));
1402        }
1403
1404        Ok((count, warnings))
1405    }
1406
1407    /// Count graph nodes, vector points, and provenance rows that would be
1408    /// affected. Returns `(graph_nodes, vector_points, prov_nodes, prov_edges)`.
1409    async fn count_graph_vector_artifacts(
1410        &self,
1411        targets: &ResolvedDeleteTargets,
1412    ) -> Result<(usize, usize, usize, usize), DeleteError> {
1413        let mut graph_nodes = 0usize;
1414        let mut vector_points = 0usize;
1415        let mut prov_nodes = 0usize;
1416        let mut prov_edges = 0usize;
1417
1418        for dataset in &targets.datasets_to_delete {
1419            let nodes = self
1420                .database
1421                .get_nodes_by_dataset(dataset.id)
1422                .await
1423                .map_err(|e| {
1424                    DeleteError::Runtime(format!(
1425                        "Failed to count provenance nodes for dataset {}: {e}",
1426                        dataset.id
1427                    ))
1428                })?;
1429
1430            let edges = self
1431                .database
1432                .get_edges_by_dataset(dataset.id)
1433                .await
1434                .map_err(|e| {
1435                    DeleteError::Runtime(format!(
1436                        "Failed to count provenance edges for dataset {}: {e}",
1437                        dataset.id
1438                    ))
1439                })?;
1440
1441            prov_nodes += nodes.len();
1442            prov_edges += edges.len();
1443
1444            // Unique node slugs = graph nodes to delete
1445            let unique_slugs: HashSet<Uuid> = nodes.iter().map(|n| n.slug).collect();
1446            graph_nodes += unique_slugs.len();
1447
1448            // Count vector points from indexed_fields
1449            for node in &nodes {
1450                let fields = parse_indexed_fields(&node.indexed_fields);
1451                vector_points += fields.len();
1452            }
1453            // Each edge contributes to EdgeType and Triplet collections
1454            vector_points += edges.len() * 2;
1455        }
1456
1457        Ok((graph_nodes, vector_points, prov_nodes, prov_edges))
1458    }
1459
1460    // ==================================================================
1461    // Target resolution
1462    // ==================================================================
1463
1464    async fn compute_deletable_data_ids(
1465        &self,
1466        targets: &ResolvedDeleteTargets,
1467    ) -> Result<Vec<Uuid>, DeleteError> {
1468        let mut links_to_remove_per_data: HashMap<Uuid, usize> = HashMap::new();
1469        for (_, data_id) in &targets.links_to_detach {
1470            *links_to_remove_per_data.entry(*data_id).or_insert(0) += 1;
1471        }
1472
1473        let mut deletable = Vec::new();
1474        for data_id in &targets.candidate_data_ids {
1475            let link_count = self
1476                .database
1477                .count_data_dataset_links(*data_id)
1478                .await
1479                .map_err(|error| {
1480                    DeleteError::Runtime(format!(
1481                        "Failed to count dataset links for data {data_id}: {error}"
1482                    ))
1483                })?;
1484            let to_remove = links_to_remove_per_data.get(data_id).copied().unwrap_or(0);
1485            if link_count <= to_remove {
1486                deletable.push(*data_id);
1487            }
1488        }
1489
1490        Ok(deletable)
1491    }
1492
1493    #[tracing::instrument(level = "debug", skip(self, request))]
1494    async fn resolve_targets(
1495        &self,
1496        request: &DeleteRequest,
1497    ) -> Result<ResolvedDeleteTargets, DeleteError> {
1498        match &request.scope {
1499            DeleteScope::Data {
1500                owner_id,
1501                data_id,
1502                dataset_name,
1503                delete_dataset_if_empty,
1504            } => {
1505                self.resolve_data_scope(
1506                    *owner_id,
1507                    *data_id,
1508                    dataset_name.as_deref(),
1509                    *delete_dataset_if_empty,
1510                )
1511                .await
1512            }
1513            DeleteScope::Dataset {
1514                owner_id,
1515                dataset_name,
1516            } => self.resolve_dataset_scope(*owner_id, dataset_name).await,
1517            DeleteScope::User { owner_id } => self.resolve_user_scope(*owner_id).await,
1518            DeleteScope::All => self.resolve_all_scope().await,
1519        }
1520    }
1521
1522    #[tracing::instrument(level = "debug", skip(self))]
1523    async fn resolve_data_scope(
1524        &self,
1525        owner_id: Uuid,
1526        data_id: Uuid,
1527        dataset_name: Option<&str>,
1528        delete_dataset_if_empty: bool,
1529    ) -> Result<ResolvedDeleteTargets, DeleteError> {
1530        let data = self.database.get_data(data_id).await.map_err(|error| {
1531            DeleteError::Runtime(format!("Failed to fetch data {data_id}: {error}"))
1532        })?;
1533
1534        // Python behaviour (datasets.py:165-176): when the Data row is absent the
1535        // caller may be using a custom graph model that didn't go through the
1536        // standard ingestion pipeline. In that case we still attempt graph/vector
1537        // cleanup using a minimal ghost targets struct (candidate_data_ids set, no
1538        // links to detach, no datasets to delete) and return success rather than
1539        // an error.
1540        let Some(data) = data else {
1541            tracing::warn!(
1542                data_id = %data_id,
1543                "Data row not found — assuming custom graph model; attempting orphan cleanup"
1544            );
1545            return Ok(ResolvedDeleteTargets {
1546                datasets_to_delete: Vec::new(),
1547                links_to_detach: Vec::new(),
1548                candidate_data_ids: vec![data_id],
1549            });
1550        };
1551
1552        if data.owner_id != owner_id {
1553            return Err(DeleteError::Validation(format!(
1554                "Data {data_id} does not belong to owner {owner_id}"
1555            )));
1556        }
1557
1558        let mut links_to_detach = Vec::new();
1559        // Collect affected datasets (with their data items) so we can check
1560        // emptiness when `delete_dataset_if_empty` is set.
1561        let mut affected_datasets: Vec<(cognee_models::Dataset, Vec<cognee_models::Data>)> =
1562            Vec::new();
1563
1564        if let Some(dataset_name) = dataset_name {
1565            let dataset = self
1566                .database
1567                .get_dataset_by_name(dataset_name, owner_id, None)
1568                .await
1569                .map_err(|error| {
1570                    DeleteError::Runtime(format!(
1571                        "Failed to resolve dataset '{dataset_name}': {error}"
1572                    ))
1573                })?
1574                .ok_or_else(|| {
1575                    DeleteError::Validation(format!(
1576                        "Dataset '{dataset_name}' was not found for owner {owner_id}"
1577                    ))
1578                })?;
1579
1580            let data_items = self
1581                .database
1582                .get_dataset_data(dataset.id)
1583                .await
1584                .map_err(|error| {
1585                    DeleteError::Runtime(format!(
1586                        "Failed to load data for dataset '{}': {}",
1587                        dataset.name, error
1588                    ))
1589                })?;
1590
1591            if !data_items.iter().any(|item| item.id == data_id) {
1592                return Err(DeleteError::Validation(format!(
1593                    "Data {} is not attached to dataset '{}'",
1594                    data_id, dataset.name
1595                )));
1596            }
1597
1598            links_to_detach.push((dataset.id, data_id));
1599            affected_datasets.push((dataset, data_items));
1600        } else {
1601            let datasets = self
1602                .database
1603                .list_datasets_for_data(data_id)
1604                .await
1605                .map_err(|error| {
1606                    DeleteError::Runtime(format!(
1607                        "Failed to list datasets for data {data_id}: {error}"
1608                    ))
1609                })?;
1610
1611            for dataset in datasets {
1612                if dataset.owner_id == owner_id {
1613                    links_to_detach.push((dataset.id, data_id));
1614                    if delete_dataset_if_empty {
1615                        let data_items =
1616                            self.database
1617                                .get_dataset_data(dataset.id)
1618                                .await
1619                                .map_err(|error| {
1620                                    DeleteError::Runtime(format!(
1621                                        "Failed to load data for dataset '{}': {}",
1622                                        dataset.name, error
1623                                    ))
1624                                })?;
1625                        affected_datasets.push((dataset, data_items));
1626                    }
1627                }
1628            }
1629
1630            if links_to_detach.is_empty() {
1631                return Err(DeleteError::Validation(format!(
1632                    "No dataset links found for data {data_id} and owner {owner_id}"
1633                )));
1634            }
1635        }
1636
1637        // When the flag is set, check each affected dataset: if it currently
1638        // has exactly one data item and that item is the one being removed,
1639        // mark the dataset for deletion.
1640        let mut datasets_to_delete = Vec::new();
1641        if delete_dataset_if_empty {
1642            for (dataset, data_items) in affected_datasets {
1643                if data_items.len() == 1 && data_items[0].id == data_id {
1644                    datasets_to_delete.push(dataset);
1645                }
1646            }
1647        }
1648
1649        Ok(ResolvedDeleteTargets {
1650            datasets_to_delete,
1651            links_to_detach,
1652            candidate_data_ids: vec![data_id],
1653        })
1654    }
1655
1656    #[tracing::instrument(level = "debug", skip(self))]
1657    async fn resolve_dataset_scope(
1658        &self,
1659        owner_id: Uuid,
1660        dataset_name: &str,
1661    ) -> Result<ResolvedDeleteTargets, DeleteError> {
1662        let dataset = self
1663            .database
1664            .get_dataset_by_name(dataset_name, owner_id, None)
1665            .await
1666            .map_err(|error| {
1667                DeleteError::Runtime(format!(
1668                    "Failed to resolve dataset '{dataset_name}': {error}"
1669                ))
1670            })?
1671            .ok_or_else(|| {
1672                DeleteError::Validation(format!(
1673                    "Dataset '{dataset_name}' was not found for owner {owner_id}"
1674                ))
1675            })?;
1676
1677        self.resolve_dataset_list(vec![dataset]).await
1678    }
1679
1680    #[tracing::instrument(level = "debug", skip(self))]
1681    async fn resolve_user_scope(
1682        &self,
1683        owner_id: Uuid,
1684    ) -> Result<ResolvedDeleteTargets, DeleteError> {
1685        let datasets = self
1686            .database
1687            .list_datasets_by_owner(owner_id)
1688            .await
1689            .map_err(|error| {
1690                DeleteError::Runtime(format!(
1691                    "Failed to list datasets for owner {owner_id}: {error}"
1692                ))
1693            })?;
1694
1695        self.resolve_dataset_list(datasets).await
1696    }
1697
1698    #[tracing::instrument(level = "debug", skip(self))]
1699    async fn resolve_all_scope(&self) -> Result<ResolvedDeleteTargets, DeleteError> {
1700        let datasets =
1701            self.database.list_datasets().await.map_err(|error| {
1702                DeleteError::Runtime(format!("Failed to list datasets: {error}"))
1703            })?;
1704
1705        self.resolve_dataset_list(datasets).await
1706    }
1707
1708    async fn resolve_dataset_list(
1709        &self,
1710        datasets: Vec<Dataset>,
1711    ) -> Result<ResolvedDeleteTargets, DeleteError> {
1712        let mut links_to_detach = Vec::new();
1713        let mut candidate_data_ids = HashSet::new();
1714
1715        for dataset in &datasets {
1716            let data_items = self
1717                .database
1718                .get_dataset_data(dataset.id)
1719                .await
1720                .map_err(|error| {
1721                    DeleteError::Runtime(format!(
1722                        "Failed to load data for dataset '{}': {}",
1723                        dataset.name, error
1724                    ))
1725                })?;
1726
1727            for data in data_items {
1728                links_to_detach.push((dataset.id, data.id));
1729                candidate_data_ids.insert(data.id);
1730            }
1731        }
1732
1733        Ok(ResolvedDeleteTargets {
1734            datasets_to_delete: datasets,
1735            links_to_detach,
1736            candidate_data_ids: candidate_data_ids.into_iter().collect(),
1737        })
1738    }
1739
1740    async fn count_data_that_would_be_deleted(
1741        &self,
1742        candidate_data_ids: &[Uuid],
1743        links_to_detach: &[(Uuid, Uuid)],
1744    ) -> Result<usize, DeleteError> {
1745        let mut links_to_remove_per_data: HashMap<Uuid, usize> = HashMap::new();
1746        for (_, data_id) in links_to_detach {
1747            *links_to_remove_per_data.entry(*data_id).or_insert(0) += 1;
1748        }
1749
1750        let mut count = 0usize;
1751        for data_id in candidate_data_ids {
1752            let link_count = self
1753                .database
1754                .count_data_dataset_links(*data_id)
1755                .await
1756                .map_err(|error| {
1757                    DeleteError::Runtime(format!(
1758                        "Failed to count dataset links for data {data_id}: {error}"
1759                    ))
1760                })?;
1761            let to_remove = links_to_remove_per_data.get(data_id).copied().unwrap_or(0);
1762            if link_count <= to_remove {
1763                count += 1;
1764            }
1765        }
1766
1767        Ok(count)
1768    }
1769}
1770
1771/// Parse the `indexed_fields` JSON value into a list of field names.
1772///
1773/// The `indexed_fields` column is a JSON array of field names (e.g., `["text"]`,
1774/// `["name"]`). If it is not a JSON array, returns an empty list.
1775fn parse_indexed_fields(value: &serde_json::Value) -> Vec<String> {
1776    match value {
1777        serde_json::Value::Array(arr) => arr
1778            .iter()
1779            .filter_map(|v| v.as_str().map(|s| s.to_string()))
1780            .collect(),
1781        _ => {
1782            warn!(
1783                "indexed_fields is not a JSON array: {:?}; skipping vector cleanup for this node",
1784                value
1785            );
1786            vec![]
1787        }
1788    }
1789}
1790
1791fn triplet_vector_id(edge: &GraphEdge) -> Uuid {
1792    Triplet::new(
1793        edge.source_node_id,
1794        edge.destination_node_id,
1795        edge.relationship_name.clone(),
1796        String::new(),
1797    )
1798    .id
1799}
1800
1801#[cfg(test)]
1802#[allow(
1803    clippy::unwrap_used,
1804    clippy::expect_used,
1805    reason = "test code — panics are acceptable failures"
1806)]
1807mod tests {
1808    use super::*;
1809    use cognee_database::{connect, initialize, ops};
1810    use cognee_graph::MockGraphDB;
1811    use cognee_models::{Data, Dataset};
1812    use cognee_storage::MockStorage;
1813    use cognee_vector::MockVectorDB;
1814
1815    // ------------------------------------------------------------------
1816    // Test helpers
1817    // ------------------------------------------------------------------
1818
1819    async fn make_service() -> (
1820        DeleteService,
1821        Arc<MockStorage>,
1822        Arc<cognee_database::DatabaseConnection>,
1823    ) {
1824        let db = connect("sqlite::memory:").await.unwrap();
1825        initialize(&db).await.unwrap();
1826        let db = Arc::new(db);
1827        let storage = Arc::new(MockStorage::new());
1828        let svc = DeleteService::new(
1829            storage.clone() as Arc<dyn StorageTrait>,
1830            db.clone() as Arc<dyn DeleteDb>,
1831        );
1832        (svc, storage, db)
1833    }
1834
1835    async fn make_service_with_graph_vector() -> (
1836        DeleteService,
1837        Arc<MockStorage>,
1838        Arc<cognee_database::DatabaseConnection>,
1839        Arc<MockGraphDB>,
1840        Arc<MockVectorDB>,
1841    ) {
1842        let db = connect("sqlite::memory:").await.unwrap();
1843        initialize(&db).await.unwrap();
1844        let db = Arc::new(db);
1845        let storage = Arc::new(MockStorage::new());
1846        let graph_db = Arc::new(MockGraphDB::new());
1847        let vector_db = Arc::new(MockVectorDB::new());
1848        let svc = DeleteService::new(
1849            storage.clone() as Arc<dyn StorageTrait>,
1850            db.clone() as Arc<dyn DeleteDb>,
1851        )
1852        .with_graph_db(graph_db.clone() as Arc<dyn GraphDBTrait>)
1853        .with_vector_db(vector_db.clone() as Arc<dyn VectorDB>);
1854        (svc, storage, db, graph_db, vector_db)
1855    }
1856
1857    /// Seed one dataset + one data item, attach them, and return their IDs.
1858    async fn seed_dataset_with_data(
1859        db: &cognee_database::DatabaseConnection,
1860        storage: &MockStorage,
1861        owner_id: Uuid,
1862        dataset_name: &str,
1863    ) -> (Uuid, Uuid) {
1864        let dataset = Dataset::new(dataset_name.to_string(), owner_id, None, Uuid::new_v4());
1865        let dataset_id = dataset.id;
1866        ops::datasets::create_dataset(db, dataset).await.unwrap();
1867
1868        let location = storage.store(b"test content", "test.txt").await.unwrap();
1869
1870        let data_id = Uuid::new_v4();
1871        let data = Data::builder(
1872            data_id,
1873            "test.txt",
1874            location,
1875            "file://test.txt",
1876            "txt",
1877            "text/plain",
1878            "hash_placeholder",
1879            owner_id,
1880        )
1881        .build();
1882        ops::data::create_data(db, data).await.unwrap();
1883        ops::datasets::attach_data_to_dataset(db, dataset_id, data_id)
1884            .await
1885            .unwrap();
1886
1887        (dataset_id, data_id)
1888    }
1889
1890    /// Seed provenance nodes in the relational DB.
1891    async fn seed_provenance_nodes(
1892        db: &cognee_database::DatabaseConnection,
1893        dataset_id: Uuid,
1894        data_id: Uuid,
1895        owner_id: Uuid,
1896        slugs: &[Uuid],
1897        node_type: &str,
1898        indexed_fields: serde_json::Value,
1899    ) {
1900        let nodes: Vec<GraphNode> = slugs
1901            .iter()
1902            .map(|slug| GraphNode {
1903                id: Uuid::new_v4(),
1904                slug: *slug,
1905                user_id: owner_id,
1906                data_id,
1907                dataset_id,
1908                label: Some(format!("node-{slug}")),
1909                node_type: node_type.to_string(),
1910                indexed_fields: indexed_fields.clone(),
1911                attributes: None,
1912                created_at: chrono::Utc::now(),
1913            })
1914            .collect();
1915        ops::graph_storage::upsert_nodes(db, &nodes).await.unwrap();
1916    }
1917
1918    /// Seed provenance edges in the relational DB.
1919    async fn seed_provenance_edges(
1920        db: &cognee_database::DatabaseConnection,
1921        dataset_id: Uuid,
1922        data_id: Uuid,
1923        owner_id: Uuid,
1924        slugs: &[Uuid],
1925        relationship_name: &str,
1926    ) {
1927        let edges: Vec<GraphEdge> = slugs
1928            .iter()
1929            .map(|slug| GraphEdge {
1930                id: Uuid::new_v4(),
1931                slug: *slug,
1932                user_id: owner_id,
1933                data_id,
1934                dataset_id,
1935                source_node_id: Uuid::new_v4(),
1936                destination_node_id: Uuid::new_v4(),
1937                relationship_name: relationship_name.to_string(),
1938                label: None,
1939                attributes: None,
1940                created_at: chrono::Utc::now(),
1941            })
1942            .collect();
1943        ops::graph_storage::upsert_edges(db, &edges).await.unwrap();
1944    }
1945
1946    // ------------------------------------------------------------------
1947    // Original tests (backward compatibility)
1948    // ------------------------------------------------------------------
1949
1950    #[tokio::test]
1951    async fn delete_dataset_with_force_removes_dataset_and_data() {
1952        let (svc, storage, db) = make_service().await;
1953        let owner = Uuid::new_v4();
1954        seed_dataset_with_data(&db, &storage, owner, "test_dataset").await;
1955
1956        let result = svc
1957            .execute(&DeleteRequest {
1958                scope: DeleteScope::Dataset {
1959                    owner_id: owner,
1960                    dataset_name: "test_dataset".to_string(),
1961                },
1962                mode: DeleteMode::Soft,
1963                memory_only: false,
1964            })
1965            .await
1966            .expect("execute should succeed");
1967
1968        assert_eq!(result.deleted_datasets, 1);
1969        assert_eq!(result.deleted_data, 1);
1970
1971        let still_exists = ops::datasets::get_dataset_by_name(&db, "test_dataset", owner, None)
1972            .await
1973            .unwrap();
1974        assert!(still_exists.is_none(), "dataset should be gone");
1975    }
1976
1977    #[tokio::test]
1978    async fn preview_does_not_mutate_database_state() {
1979        let (svc, storage, db) = make_service().await;
1980        let owner = Uuid::new_v4();
1981        let (_ds_id, data_id) = seed_dataset_with_data(&db, &storage, owner, "test_dataset").await;
1982
1983        let request = DeleteRequest {
1984            scope: DeleteScope::Dataset {
1985                owner_id: owner,
1986                dataset_name: "test_dataset".to_string(),
1987            },
1988            mode: DeleteMode::Soft,
1989            memory_only: false,
1990        };
1991
1992        let preview = svc.preview(&request).await.expect("preview should succeed");
1993
1994        assert_eq!(preview.datasets_to_delete, 1);
1995        assert_eq!(preview.data_to_delete, 1);
1996
1997        let still_exists = ops::datasets::get_dataset_by_name(&db, "test_dataset", owner, None)
1998            .await
1999            .unwrap();
2000        assert!(
2001            still_exists.is_some(),
2002            "dataset should still exist after preview"
2003        );
2004        let data_still_there = ops::data::get_data(&db, data_id).await.unwrap();
2005        assert!(
2006            data_still_there.is_some(),
2007            "data should be unchanged after preview"
2008        );
2009    }
2010
2011    #[tokio::test]
2012    async fn delete_nonexistent_dataset_returns_validation_error() {
2013        let (svc, _storage, _db) = make_service().await;
2014
2015        let err = svc
2016            .execute(&DeleteRequest {
2017                scope: DeleteScope::Dataset {
2018                    owner_id: Uuid::new_v4(),
2019                    dataset_name: "nonexistent".to_string(),
2020                },
2021                mode: DeleteMode::Soft,
2022                memory_only: false,
2023            })
2024            .await
2025            .expect_err("should fail for nonexistent dataset");
2026
2027        assert!(
2028            matches!(err, DeleteError::Validation(_)),
2029            "expected Validation error, got: {err:?}"
2030        );
2031    }
2032
2033    #[tokio::test]
2034    async fn shared_data_not_deleted_while_linked_to_another_dataset() {
2035        let (svc, storage, db) = make_service().await;
2036        let owner = Uuid::new_v4();
2037
2038        let ds1 = Dataset::new("dataset1".to_string(), owner, None, Uuid::new_v4());
2039        let ds2 = Dataset::new("dataset2".to_string(), owner, None, Uuid::new_v4());
2040        let ds1_id = ds1.id;
2041        let ds2_id = ds2.id;
2042        ops::datasets::create_dataset(&db, ds1).await.unwrap();
2043        ops::datasets::create_dataset(&db, ds2).await.unwrap();
2044
2045        let location = storage
2046            .store(b"shared content", "shared.txt")
2047            .await
2048            .unwrap();
2049        let data_id = Uuid::new_v4();
2050        let data = Data::builder(
2051            data_id,
2052            "shared.txt",
2053            location,
2054            "file://shared.txt",
2055            "txt",
2056            "text/plain",
2057            "shared_hash",
2058            owner,
2059        )
2060        .build();
2061        ops::data::create_data(&db, data).await.unwrap();
2062        ops::datasets::attach_data_to_dataset(&db, ds1_id, data_id)
2063            .await
2064            .unwrap();
2065        ops::datasets::attach_data_to_dataset(&db, ds2_id, data_id)
2066            .await
2067            .unwrap();
2068
2069        let result = svc
2070            .execute(&DeleteRequest {
2071                scope: DeleteScope::Dataset {
2072                    owner_id: owner,
2073                    dataset_name: "dataset1".to_string(),
2074                },
2075                mode: DeleteMode::Soft,
2076                memory_only: false,
2077            })
2078            .await
2079            .expect("execute should succeed");
2080
2081        assert_eq!(result.deleted_datasets, 1);
2082        assert_eq!(
2083            result.deleted_data, 0,
2084            "data must not be deleted while still linked to dataset2"
2085        );
2086
2087        let data_still_there = ops::data::get_data(&db, data_id).await.unwrap();
2088        assert!(data_still_there.is_some(), "data record must survive");
2089    }
2090
2091    #[tokio::test]
2092    async fn data_deleted_when_last_dataset_link_removed() {
2093        let (svc, storage, db) = make_service().await;
2094        let owner = Uuid::new_v4();
2095
2096        let ds1 = Dataset::new("dataset1".to_string(), owner, None, Uuid::new_v4());
2097        let ds2 = Dataset::new("dataset2".to_string(), owner, None, Uuid::new_v4());
2098        let ds1_id = ds1.id;
2099        let ds2_id = ds2.id;
2100        ops::datasets::create_dataset(&db, ds1).await.unwrap();
2101        ops::datasets::create_dataset(&db, ds2).await.unwrap();
2102
2103        let location = storage
2104            .store(b"shared content", "shared.txt")
2105            .await
2106            .unwrap();
2107        let data_id = Uuid::new_v4();
2108        let data = Data::builder(
2109            data_id,
2110            "shared.txt",
2111            location,
2112            "file://shared.txt",
2113            "txt",
2114            "text/plain",
2115            "shared_hash",
2116            owner,
2117        )
2118        .build();
2119        ops::data::create_data(&db, data).await.unwrap();
2120        ops::datasets::attach_data_to_dataset(&db, ds1_id, data_id)
2121            .await
2122            .unwrap();
2123        ops::datasets::attach_data_to_dataset(&db, ds2_id, data_id)
2124            .await
2125            .unwrap();
2126
2127        svc.execute(&DeleteRequest {
2128            scope: DeleteScope::Dataset {
2129                owner_id: owner,
2130                dataset_name: "dataset1".to_string(),
2131            },
2132            mode: DeleteMode::Soft,
2133            memory_only: false,
2134        })
2135        .await
2136        .expect("delete dataset1");
2137
2138        let result = svc
2139            .execute(&DeleteRequest {
2140                scope: DeleteScope::Dataset {
2141                    owner_id: owner,
2142                    dataset_name: "dataset2".to_string(),
2143                },
2144                mode: DeleteMode::Soft,
2145                memory_only: false,
2146            })
2147            .await
2148            .expect("delete dataset2");
2149
2150        assert_eq!(
2151            result.deleted_data, 1,
2152            "data must be deleted when last link is removed"
2153        );
2154
2155        let data_gone = ops::data::get_data(&db, data_id).await.unwrap();
2156        assert!(data_gone.is_none(), "data record must be gone");
2157    }
2158
2159    #[tokio::test]
2160    async fn delete_dataset_with_wrong_owner_returns_validation_error() {
2161        let (svc, storage, db) = make_service().await;
2162        let owner_a = Uuid::new_v4();
2163        let owner_b = Uuid::new_v4();
2164
2165        seed_dataset_with_data(&db, &storage, owner_a, "owner_a_dataset").await;
2166
2167        let err = svc
2168            .execute(&DeleteRequest {
2169                scope: DeleteScope::Dataset {
2170                    owner_id: owner_b,
2171                    dataset_name: "owner_a_dataset".to_string(),
2172                },
2173                mode: DeleteMode::Soft,
2174                memory_only: false,
2175            })
2176            .await
2177            .expect_err("should fail for wrong owner");
2178
2179        assert!(
2180            matches!(err, DeleteError::Validation(_)),
2181            "expected Validation error for wrong owner, got: {err:?}"
2182        );
2183    }
2184
2185    // ------------------------------------------------------------------
2186    // New tests: graph/vector cleanup
2187    // ------------------------------------------------------------------
2188
2189    #[tokio::test]
2190    async fn delete_dataset_cleans_graph_nodes() {
2191        let (svc, storage, db, graph_db, _vector_db) = make_service_with_graph_vector().await;
2192        let owner = Uuid::new_v4();
2193        let (dataset_id, data_id) = seed_dataset_with_data(&db, &storage, owner, "graph_ds").await;
2194
2195        // Seed provenance and graph nodes
2196        let slug1 = Uuid::new_v4();
2197        let slug2 = Uuid::new_v4();
2198        seed_provenance_nodes(
2199            &db,
2200            dataset_id,
2201            data_id,
2202            owner,
2203            &[slug1, slug2],
2204            "Entity",
2205            serde_json::json!(["name"]),
2206        )
2207        .await;
2208
2209        // Add matching nodes to graph DB
2210        graph_db
2211            .add_node_raw(serde_json::json!({"id": slug1.to_string(), "name": "Alice"}))
2212            .await
2213            .unwrap();
2214        graph_db
2215            .add_node_raw(serde_json::json!({"id": slug2.to_string(), "name": "Bob"}))
2216            .await
2217            .unwrap();
2218        assert_eq!(graph_db.node_count(), 2);
2219
2220        // Execute delete
2221        let result = svc
2222            .execute(&DeleteRequest {
2223                scope: DeleteScope::Dataset {
2224                    owner_id: owner,
2225                    dataset_name: "graph_ds".to_string(),
2226                },
2227                mode: DeleteMode::Soft,
2228                memory_only: false,
2229            })
2230            .await
2231            .expect("execute should succeed");
2232
2233        assert_eq!(result.deleted_datasets, 1);
2234        assert_eq!(result.deleted_graph_nodes, 2);
2235        assert_eq!(graph_db.node_count(), 0, "graph nodes should be cleaned up");
2236    }
2237
2238    #[tokio::test]
2239    async fn delete_dataset_cleans_vector_points() {
2240        let (svc, storage, db, _graph_db, vector_db) = make_service_with_graph_vector().await;
2241        let owner = Uuid::new_v4();
2242        let (dataset_id, data_id) = seed_dataset_with_data(&db, &storage, owner, "vector_ds").await;
2243
2244        // Seed provenance nodes
2245        let slug1 = Uuid::new_v4();
2246        seed_provenance_nodes(
2247            &db,
2248            dataset_id,
2249            data_id,
2250            owner,
2251            &[slug1],
2252            "Entity",
2253            serde_json::json!(["name"]),
2254        )
2255        .await;
2256
2257        // Create collection and index a point
2258        vector_db
2259            .create_collection("Entity", "name", 3)
2260            .await
2261            .unwrap();
2262        let point = cognee_vector::VectorPoint::new(slug1, vec![1.0, 0.0, 0.0]);
2263        vector_db
2264            .index_points("Entity", "name", &[point])
2265            .await
2266            .unwrap();
2267        assert_eq!(
2268            vector_db.collection_size("Entity", "name").await.unwrap(),
2269            1
2270        );
2271
2272        // Execute delete
2273        let result = svc
2274            .execute(&DeleteRequest {
2275                scope: DeleteScope::Dataset {
2276                    owner_id: owner,
2277                    dataset_name: "vector_ds".to_string(),
2278                },
2279                mode: DeleteMode::Soft,
2280                memory_only: false,
2281            })
2282            .await
2283            .expect("execute should succeed");
2284
2285        assert_eq!(result.deleted_datasets, 1);
2286        assert_eq!(result.deleted_vector_points, 1);
2287        assert_eq!(
2288            vector_db.collection_size("Entity", "name").await.unwrap(),
2289            0,
2290            "vector point should be removed"
2291        );
2292    }
2293
2294    #[tokio::test]
2295    async fn delete_all_wipes_graph_and_vector() {
2296        let (svc, storage, db, graph_db, vector_db) = make_service_with_graph_vector().await;
2297        let owner = Uuid::new_v4();
2298        seed_dataset_with_data(&db, &storage, owner, "all_ds").await;
2299
2300        // Add some data to graph and vector
2301        graph_db
2302            .add_node_raw(serde_json::json!({"id": "node1", "name": "Alice"}))
2303            .await
2304            .unwrap();
2305        vector_db
2306            .create_collection("Entity", "name", 3)
2307            .await
2308            .unwrap();
2309        let point = cognee_vector::VectorPoint::new(Uuid::new_v4(), vec![1.0, 0.0, 0.0]);
2310        vector_db
2311            .index_points("Entity", "name", &[point])
2312            .await
2313            .unwrap();
2314
2315        // Execute delete all
2316        let result = svc
2317            .execute(&DeleteRequest {
2318                scope: DeleteScope::All,
2319                mode: DeleteMode::Soft,
2320                memory_only: false,
2321            })
2322            .await
2323            .expect("execute should succeed");
2324
2325        assert_eq!(result.deleted_datasets, 1);
2326        assert!(
2327            graph_db.is_empty().await.unwrap(),
2328            "graph should be completely wiped"
2329        );
2330        assert!(
2331            !vector_db.has_collection("Entity", "name").await.unwrap(),
2332            "vector collection should be deleted"
2333        );
2334    }
2335
2336    #[tokio::test]
2337    async fn delete_all_reports_provenance_backed_graph_and_vector_counts() {
2338        let (svc, storage, db, _graph_db, _vector_db) = make_service_with_graph_vector().await;
2339        let owner = Uuid::new_v4();
2340        let (dataset_id, data_id) =
2341            seed_dataset_with_data(&db, &storage, owner, "all_counts_ds").await;
2342
2343        let node_slugs = [Uuid::new_v4(), Uuid::new_v4()];
2344        seed_provenance_nodes(
2345            &db,
2346            dataset_id,
2347            data_id,
2348            owner,
2349            &node_slugs,
2350            "Entity",
2351            serde_json::json!(["name", "description"]),
2352        )
2353        .await;
2354
2355        let edge_slug = Uuid::new_v4();
2356        seed_provenance_edges(&db, dataset_id, data_id, owner, &[edge_slug], "knows").await;
2357
2358        let request = DeleteRequest {
2359            scope: DeleteScope::All,
2360            mode: DeleteMode::Soft,
2361            memory_only: false,
2362        };
2363
2364        let preview = svc.preview(&request).await.expect("preview should succeed");
2365        assert_eq!(preview.graph_nodes_to_delete, 2);
2366        assert_eq!(preview.vector_points_to_delete, 6);
2367        assert_eq!(preview.provenance_nodes_to_delete, 2);
2368        assert_eq!(preview.provenance_edges_to_delete, 1);
2369
2370        let result = svc.execute(&request).await.expect("execute should succeed");
2371        assert_eq!(result.deleted_graph_nodes, preview.graph_nodes_to_delete);
2372        assert_eq!(
2373            result.deleted_vector_points,
2374            preview.vector_points_to_delete
2375        );
2376        assert_eq!(
2377            result.deleted_provenance_nodes,
2378            preview.provenance_nodes_to_delete
2379        );
2380        assert_eq!(
2381            result.deleted_provenance_edges,
2382            preview.provenance_edges_to_delete
2383        );
2384    }
2385
2386    #[tokio::test]
2387    async fn delete_without_graph_db_emits_warning() {
2388        let (svc, storage, db) = make_service().await;
2389        let owner = Uuid::new_v4();
2390        let (dataset_id, data_id) =
2391            seed_dataset_with_data(&db, &storage, owner, "no_graph_ds").await;
2392
2393        // Seed provenance so there IS something to clean up
2394        let slug = Uuid::new_v4();
2395        seed_provenance_nodes(
2396            &db,
2397            dataset_id,
2398            data_id,
2399            owner,
2400            &[slug],
2401            "Entity",
2402            serde_json::json!(["name"]),
2403        )
2404        .await;
2405
2406        let result = svc
2407            .execute(&DeleteRequest {
2408                scope: DeleteScope::Dataset {
2409                    owner_id: owner,
2410                    dataset_name: "no_graph_ds".to_string(),
2411                },
2412                mode: DeleteMode::Soft,
2413                memory_only: false,
2414            })
2415            .await
2416            .expect("execute should succeed");
2417
2418        assert!(
2419            result
2420                .warnings
2421                .iter()
2422                .any(|w| w.contains("Graph DB not configured")),
2423            "should warn about missing graph DB, got: {:?}",
2424            result.warnings
2425        );
2426        assert!(
2427            result
2428                .warnings
2429                .iter()
2430                .any(|w| w.contains("Vector DB not configured")),
2431            "should warn about missing vector DB, got: {:?}",
2432            result.warnings
2433        );
2434    }
2435
2436    #[tokio::test]
2437    async fn delete_dataset_cleans_edge_vector_points() {
2438        let (svc, storage, db, _graph_db, vector_db) = make_service_with_graph_vector().await;
2439        let owner = Uuid::new_v4();
2440        let (dataset_id, data_id) = seed_dataset_with_data(&db, &storage, owner, "edge_ds").await;
2441
2442        // Seed provenance edges
2443        let source_id = Uuid::new_v4();
2444        let target_id = Uuid::new_v4();
2445        let relationship_name = "knows";
2446        let triplet_id = Triplet::new(
2447            source_id,
2448            target_id,
2449            relationship_name.to_string(),
2450            String::new(),
2451        )
2452        .id;
2453        ops::graph_storage::upsert_edges(
2454            &db,
2455            &[GraphEdge {
2456                id: Uuid::new_v4(),
2457                slug: triplet_id,
2458                user_id: owner,
2459                data_id,
2460                dataset_id,
2461                source_node_id: source_id,
2462                destination_node_id: target_id,
2463                relationship_name: relationship_name.to_string(),
2464                label: None,
2465                attributes: None,
2466                created_at: chrono::Utc::now(),
2467            }],
2468        )
2469        .await
2470        .unwrap();
2471
2472        // Create EdgeType and Triplet collections and index points
2473        vector_db
2474            .create_collection("EdgeType", "relationship_name", 3)
2475            .await
2476            .unwrap();
2477        vector_db
2478            .create_collection("Triplet", "text", 3)
2479            .await
2480            .unwrap();
2481
2482        let et_point = cognee_vector::VectorPoint::new(
2483            EdgeType::deterministic_id(relationship_name),
2484            vec![1.0, 0.0, 0.0],
2485        );
2486        vector_db
2487            .index_points("EdgeType", "relationship_name", &[et_point])
2488            .await
2489            .unwrap();
2490        let triplet_point = cognee_vector::VectorPoint::new(triplet_id, vec![0.0, 1.0, 0.0]);
2491        vector_db
2492            .index_points("Triplet", "text", &[triplet_point])
2493            .await
2494            .unwrap();
2495
2496        // Execute delete
2497        let result = svc
2498            .execute(&DeleteRequest {
2499                scope: DeleteScope::Dataset {
2500                    owner_id: owner,
2501                    dataset_name: "edge_ds".to_string(),
2502                },
2503                mode: DeleteMode::Soft,
2504                memory_only: false,
2505            })
2506            .await
2507            .expect("execute should succeed");
2508
2509        assert_eq!(result.deleted_datasets, 1);
2510        // Each edge contributes 1 point to EdgeType and 1 to Triplet = 2
2511        assert_eq!(result.deleted_vector_points, 2);
2512        assert_eq!(
2513            vector_db
2514                .collection_size("EdgeType", "relationship_name")
2515                .await
2516                .unwrap(),
2517            0,
2518            "EdgeType vector point should be removed"
2519        );
2520        assert_eq!(
2521            vector_db.collection_size("Triplet", "text").await.unwrap(),
2522            0,
2523            "Triplet vector point should be removed"
2524        );
2525    }
2526
2527    #[tokio::test]
2528    async fn delete_without_provenance_still_works() {
2529        let (svc, storage, db, graph_db, _vector_db) = make_service_with_graph_vector().await;
2530        let owner = Uuid::new_v4();
2531        seed_dataset_with_data(&db, &storage, owner, "no_prov_ds").await;
2532
2533        // No provenance seeded -- graph/vector cleanup should be a no-op
2534        let result = svc
2535            .execute(&DeleteRequest {
2536                scope: DeleteScope::Dataset {
2537                    owner_id: owner,
2538                    dataset_name: "no_prov_ds".to_string(),
2539                },
2540                mode: DeleteMode::Soft,
2541                memory_only: false,
2542            })
2543            .await
2544            .expect("execute should succeed");
2545
2546        assert_eq!(result.deleted_datasets, 1);
2547        assert_eq!(result.deleted_graph_nodes, 0);
2548        assert_eq!(result.deleted_vector_points, 0);
2549        assert!(
2550            graph_db.is_empty().await.unwrap(),
2551            "graph should still be empty"
2552        );
2553    }
2554
2555    #[tokio::test]
2556    async fn shared_node_not_removed_when_sibling_data_exists() {
2557        let (svc, storage, db, graph_db, _vector_db) = make_service_with_graph_vector().await;
2558        let owner = Uuid::new_v4();
2559
2560        // Create dataset + first data item
2561        let dataset = Dataset::new("shared_ds".to_string(), owner, None, Uuid::new_v4());
2562        let dataset_id = dataset.id;
2563        ops::datasets::create_dataset(&db, dataset).await.unwrap();
2564
2565        let loc1 = storage.store(b"content one", "one.txt").await.unwrap();
2566        let data_id_1 = Uuid::new_v4();
2567        let data1 = Data::builder(
2568            data_id_1,
2569            "one.txt",
2570            loc1,
2571            "file://one.txt",
2572            "txt",
2573            "text/plain",
2574            "hash_one",
2575            owner,
2576        )
2577        .build();
2578        ops::data::create_data(&db, data1).await.unwrap();
2579        ops::datasets::attach_data_to_dataset(&db, dataset_id, data_id_1)
2580            .await
2581            .unwrap();
2582
2583        let loc2 = storage.store(b"content two", "two.txt").await.unwrap();
2584        let data_id_2 = Uuid::new_v4();
2585        let data2 = Data::builder(
2586            data_id_2,
2587            "two.txt",
2588            loc2,
2589            "file://two.txt",
2590            "txt",
2591            "text/plain",
2592            "hash_two",
2593            owner,
2594        )
2595        .build();
2596        ops::data::create_data(&db, data2).await.unwrap();
2597        ops::datasets::attach_data_to_dataset(&db, dataset_id, data_id_2)
2598            .await
2599            .unwrap();
2600
2601        // Shared slug: both data items reference the same entity
2602        let shared_slug = Uuid::new_v4();
2603        let unique_slug = Uuid::new_v4();
2604
2605        // data_id_1 has shared_slug + unique_slug
2606        seed_provenance_nodes(
2607            &db,
2608            dataset_id,
2609            data_id_1,
2610            owner,
2611            &[shared_slug, unique_slug],
2612            "Entity",
2613            serde_json::json!(["name"]),
2614        )
2615        .await;
2616
2617        // data_id_2 also references shared_slug
2618        seed_provenance_nodes(
2619            &db,
2620            dataset_id,
2621            data_id_2,
2622            owner,
2623            &[shared_slug],
2624            "Entity",
2625            serde_json::json!(["name"]),
2626        )
2627        .await;
2628
2629        // Add graph nodes
2630        graph_db
2631            .add_node_raw(serde_json::json!({"id": shared_slug.to_string(), "name": "Shared"}))
2632            .await
2633            .unwrap();
2634        graph_db
2635            .add_node_raw(serde_json::json!({"id": unique_slug.to_string(), "name": "Unique"}))
2636            .await
2637            .unwrap();
2638        assert_eq!(graph_db.node_count(), 2);
2639
2640        // Delete data_id_1 from the dataset
2641        let result = svc
2642            .execute(&DeleteRequest {
2643                scope: DeleteScope::Data {
2644                    owner_id: owner,
2645                    data_id: data_id_1,
2646                    dataset_name: Some("shared_ds".to_string()),
2647                    delete_dataset_if_empty: false,
2648                },
2649                mode: DeleteMode::Soft,
2650                memory_only: false,
2651            })
2652            .await
2653            .expect("execute should succeed");
2654
2655        // unique_slug should be cleaned up; shared_slug should survive
2656        assert_eq!(
2657            result.deleted_graph_nodes, 1,
2658            "only the unique node should be deleted"
2659        );
2660        assert_eq!(
2661            graph_db.node_count(),
2662            1,
2663            "shared node should survive because data_id_2 also references it"
2664        );
2665    }
2666
2667    // ------------------------------------------------------------------
2668    // New tests: relational DB provenance state verification
2669    // ------------------------------------------------------------------
2670
2671    #[tokio::test]
2672    async fn test_data_deletion_cleans_relational_provenance() {
2673        let (svc, storage, db, _graph_db, _vector_db) = make_service_with_graph_vector().await;
2674        let owner = Uuid::new_v4();
2675        let (dataset_id, data_id) =
2676            seed_dataset_with_data(&db, &storage, owner, "prov_data_ds").await;
2677
2678        // Seed provenance nodes and edges for this data item
2679        let node_slug = Uuid::new_v4();
2680        let edge_slug = Uuid::new_v4();
2681        seed_provenance_nodes(
2682            &db,
2683            dataset_id,
2684            data_id,
2685            owner,
2686            &[node_slug],
2687            "Entity",
2688            serde_json::json!(["name"]),
2689        )
2690        .await;
2691        seed_provenance_edges(&db, dataset_id, data_id, owner, &[edge_slug], "knows").await;
2692
2693        // Verify provenance rows exist before deletion
2694        let nodes_before = ops::graph_storage::get_nodes_by_data(&db, data_id, dataset_id)
2695            .await
2696            .unwrap();
2697        let edges_before = ops::graph_storage::get_edges_by_data(&db, data_id, dataset_id)
2698            .await
2699            .unwrap();
2700        assert_eq!(
2701            nodes_before.len(),
2702            1,
2703            "should have 1 provenance node before delete"
2704        );
2705        assert_eq!(
2706            edges_before.len(),
2707            1,
2708            "should have 1 provenance edge before delete"
2709        );
2710
2711        // Delete the data item
2712        let result = svc
2713            .execute(&DeleteRequest {
2714                scope: DeleteScope::Data {
2715                    owner_id: owner,
2716                    data_id,
2717                    dataset_name: Some("prov_data_ds".to_string()),
2718                    delete_dataset_if_empty: false,
2719                },
2720                mode: DeleteMode::Soft,
2721                memory_only: false,
2722            })
2723            .await
2724            .expect("execute should succeed");
2725
2726        assert_eq!(result.deleted_data, 1);
2727        assert_eq!(result.deleted_provenance_nodes, 1);
2728        assert_eq!(result.deleted_provenance_edges, 1);
2729
2730        // Query DB directly: provenance rows should be gone
2731        let nodes_after = ops::graph_storage::get_nodes_by_data(&db, data_id, dataset_id)
2732            .await
2733            .unwrap();
2734        let edges_after = ops::graph_storage::get_edges_by_data(&db, data_id, dataset_id)
2735            .await
2736            .unwrap();
2737        assert!(
2738            nodes_after.is_empty(),
2739            "provenance nodes should be gone after data deletion"
2740        );
2741        assert!(
2742            edges_after.is_empty(),
2743            "provenance edges should be gone after data deletion"
2744        );
2745    }
2746
2747    #[tokio::test]
2748    async fn test_dataset_deletion_cascades_relational_provenance() {
2749        let (svc, storage, db, _graph_db, _vector_db) = make_service_with_graph_vector().await;
2750        let owner = Uuid::new_v4();
2751        let (dataset_id, data_id) =
2752            seed_dataset_with_data(&db, &storage, owner, "prov_ds_ds").await;
2753
2754        // Seed multiple provenance nodes and edges
2755        let node_slugs = [Uuid::new_v4(), Uuid::new_v4(), Uuid::new_v4()];
2756        let edge_slugs = [Uuid::new_v4(), Uuid::new_v4()];
2757        seed_provenance_nodes(
2758            &db,
2759            dataset_id,
2760            data_id,
2761            owner,
2762            &node_slugs,
2763            "Entity",
2764            serde_json::json!(["name"]),
2765        )
2766        .await;
2767        seed_provenance_edges(&db, dataset_id, data_id, owner, &edge_slugs, "related_to").await;
2768
2769        // Verify provenance rows exist
2770        let nodes_before = ops::graph_storage::get_nodes_by_dataset(&db, dataset_id)
2771            .await
2772            .unwrap();
2773        let edges_before = ops::graph_storage::get_edges_by_dataset(&db, dataset_id)
2774            .await
2775            .unwrap();
2776        assert_eq!(nodes_before.len(), 3);
2777        assert_eq!(edges_before.len(), 2);
2778
2779        // Delete the entire dataset
2780        let result = svc
2781            .execute(&DeleteRequest {
2782                scope: DeleteScope::Dataset {
2783                    owner_id: owner,
2784                    dataset_name: "prov_ds_ds".to_string(),
2785                },
2786                mode: DeleteMode::Soft,
2787                memory_only: false,
2788            })
2789            .await
2790            .expect("execute should succeed");
2791
2792        assert_eq!(result.deleted_datasets, 1);
2793        assert_eq!(result.deleted_data, 1);
2794        assert_eq!(result.deleted_provenance_nodes, 3);
2795        assert_eq!(result.deleted_provenance_edges, 2);
2796
2797        // Query DB directly: all provenance rows for this dataset should be gone
2798        let nodes_after = ops::graph_storage::get_nodes_by_dataset(&db, dataset_id)
2799            .await
2800            .unwrap();
2801        let edges_after = ops::graph_storage::get_edges_by_dataset(&db, dataset_id)
2802            .await
2803            .unwrap();
2804        assert!(
2805            nodes_after.is_empty(),
2806            "provenance nodes should be gone after dataset deletion"
2807        );
2808        assert!(
2809            edges_after.is_empty(),
2810            "provenance edges should be gone after dataset deletion"
2811        );
2812    }
2813
2814    #[tokio::test]
2815    async fn test_data_deletion_preserves_sibling_provenance() {
2816        let (svc, storage, db, _graph_db, _vector_db) = make_service_with_graph_vector().await;
2817        let owner = Uuid::new_v4();
2818
2819        // Create one dataset with two data items
2820        let dataset = Dataset::new("sibling_ds".to_string(), owner, None, Uuid::new_v4());
2821        let dataset_id = dataset.id;
2822        ops::datasets::create_dataset(&db, dataset).await.unwrap();
2823
2824        let loc1 = storage.store(b"content alpha", "alpha.txt").await.unwrap();
2825        let data_id_1 = Uuid::new_v4();
2826        let data1 = Data::builder(
2827            data_id_1,
2828            "alpha.txt",
2829            loc1,
2830            "file://alpha.txt",
2831            "txt",
2832            "text/plain",
2833            "hash_alpha",
2834            owner,
2835        )
2836        .build();
2837        ops::data::create_data(&db, data1).await.unwrap();
2838        ops::datasets::attach_data_to_dataset(&db, dataset_id, data_id_1)
2839            .await
2840            .unwrap();
2841
2842        let loc2 = storage.store(b"content beta", "beta.txt").await.unwrap();
2843        let data_id_2 = Uuid::new_v4();
2844        let data2 = Data::builder(
2845            data_id_2,
2846            "beta.txt",
2847            loc2,
2848            "file://beta.txt",
2849            "txt",
2850            "text/plain",
2851            "hash_beta",
2852            owner,
2853        )
2854        .build();
2855        ops::data::create_data(&db, data2).await.unwrap();
2856        ops::datasets::attach_data_to_dataset(&db, dataset_id, data_id_2)
2857            .await
2858            .unwrap();
2859
2860        // Seed separate provenance for each data item
2861        let slug_d1 = Uuid::new_v4();
2862        let edge_d1 = Uuid::new_v4();
2863        seed_provenance_nodes(
2864            &db,
2865            dataset_id,
2866            data_id_1,
2867            owner,
2868            &[slug_d1],
2869            "Entity",
2870            serde_json::json!(["name"]),
2871        )
2872        .await;
2873        seed_provenance_edges(&db, dataset_id, data_id_1, owner, &[edge_d1], "mentions").await;
2874
2875        let slug_d2 = Uuid::new_v4();
2876        let edge_d2 = Uuid::new_v4();
2877        seed_provenance_nodes(
2878            &db,
2879            dataset_id,
2880            data_id_2,
2881            owner,
2882            &[slug_d2],
2883            "Entity",
2884            serde_json::json!(["name"]),
2885        )
2886        .await;
2887        seed_provenance_edges(&db, dataset_id, data_id_2, owner, &[edge_d2], "describes").await;
2888
2889        // Verify 2 + 2 provenance rows total
2890        let all_nodes = ops::graph_storage::get_nodes_by_dataset(&db, dataset_id)
2891            .await
2892            .unwrap();
2893        let all_edges = ops::graph_storage::get_edges_by_dataset(&db, dataset_id)
2894            .await
2895            .unwrap();
2896        assert_eq!(all_nodes.len(), 2, "2 provenance nodes total before delete");
2897        assert_eq!(all_edges.len(), 2, "2 provenance edges total before delete");
2898
2899        // Delete only data_id_1
2900        let result = svc
2901            .execute(&DeleteRequest {
2902                scope: DeleteScope::Data {
2903                    owner_id: owner,
2904                    data_id: data_id_1,
2905                    dataset_name: Some("sibling_ds".to_string()),
2906                    delete_dataset_if_empty: false,
2907                },
2908                mode: DeleteMode::Soft,
2909                memory_only: false,
2910            })
2911            .await
2912            .expect("execute should succeed");
2913
2914        assert_eq!(result.deleted_provenance_nodes, 1);
2915        assert_eq!(result.deleted_provenance_edges, 1);
2916
2917        // data_id_1 provenance should be gone
2918        let d1_nodes = ops::graph_storage::get_nodes_by_data(&db, data_id_1, dataset_id)
2919            .await
2920            .unwrap();
2921        let d1_edges = ops::graph_storage::get_edges_by_data(&db, data_id_1, dataset_id)
2922            .await
2923            .unwrap();
2924        assert!(
2925            d1_nodes.is_empty(),
2926            "data_id_1 provenance nodes should be gone"
2927        );
2928        assert!(
2929            d1_edges.is_empty(),
2930            "data_id_1 provenance edges should be gone"
2931        );
2932
2933        // data_id_2 provenance should survive
2934        let d2_nodes = ops::graph_storage::get_nodes_by_data(&db, data_id_2, dataset_id)
2935            .await
2936            .unwrap();
2937        let d2_edges = ops::graph_storage::get_edges_by_data(&db, data_id_2, dataset_id)
2938            .await
2939            .unwrap();
2940        assert_eq!(
2941            d2_nodes.len(),
2942            1,
2943            "data_id_2 provenance nodes should survive sibling deletion"
2944        );
2945        assert_eq!(
2946            d2_edges.len(),
2947            1,
2948            "data_id_2 provenance edges should survive sibling deletion"
2949        );
2950    }
2951
2952    // ------------------------------------------------------------------
2953    // ACL authorization tests
2954    // ------------------------------------------------------------------
2955
2956    /// Helper to create an AuthorizedDeleteService backed by a real SQLite DB.
2957    ///
2958    /// The closed `AccessControl` (`impl AclDb for ...`) lives in the
2959    /// closed `cognee-access-control` crate. OSS tests drive ACL decisions
2960    /// through `MockAclDb`; this helper grants all four permissions on
2961    /// every dataset by default so behavioural assertions keep passing.
2962    async fn make_authorized_service() -> (
2963        AuthorizedDeleteService,
2964        Arc<MockStorage>,
2965        Arc<cognee_database::DatabaseConnection>,
2966        Arc<cognee_test_utils::MockAclDb>,
2967    ) {
2968        use cognee_database::AclDb;
2969
2970        let db = connect("sqlite::memory:").await.unwrap();
2971        initialize(&db).await.unwrap();
2972        let db = Arc::new(db);
2973        let storage = Arc::new(MockStorage::new());
2974        let acl = Arc::new(cognee_test_utils::MockAclDb::new());
2975        let svc = DeleteService::new(
2976            storage.clone() as Arc<dyn StorageTrait>,
2977            db.clone() as Arc<dyn DeleteDb>,
2978        );
2979        let auth_svc = AuthorizedDeleteService::new(
2980            svc,
2981            acl.clone() as Arc<dyn AclDb>,
2982            db.clone() as Arc<dyn DeleteDb>,
2983        );
2984        (auth_svc, storage, db, acl)
2985    }
2986
2987    /// Grant the four canonical permissions on `dataset_id` to `principal_id`
2988    /// through the supplied `MockAclDb`, matching the production semantics
2989    /// of `ops::acl::grant_all_permissions_on_dataset` (which a closed
2990    /// `AccessControl` would persist into the real `acls` table).
2991    async fn mock_grant_all_perms(
2992        acl: &Arc<cognee_test_utils::MockAclDb>,
2993        principal_id: Uuid,
2994        dataset_id: Uuid,
2995    ) {
2996        use cognee_database::AclDb;
2997        let acl_dyn: &dyn AclDb = acl.as_ref();
2998        acl_dyn
2999            .ensure_principal(principal_id, "user")
3000            .await
3001            .unwrap();
3002        for perm in ["read", "write", "delete", "share"] {
3003            acl_dyn
3004                .grant_permission(principal_id, dataset_id, perm)
3005                .await
3006                .unwrap();
3007        }
3008    }
3009
3010    #[tokio::test]
3011    async fn authorized_delete_succeeds_with_permission() {
3012        let (svc, storage, db, acl) = make_authorized_service().await;
3013        let owner = Uuid::new_v4();
3014        let (dataset_id, _data_id) =
3015            seed_dataset_with_data(&db, &storage, owner, "acl_ok_ds").await;
3016
3017        mock_grant_all_perms(&acl, owner, dataset_id).await;
3018
3019        let result = svc
3020            .execute(
3021                &DeleteRequest {
3022                    scope: DeleteScope::Dataset {
3023                        owner_id: owner,
3024                        dataset_name: "acl_ok_ds".to_string(),
3025                    },
3026                    mode: DeleteMode::Soft,
3027                    memory_only: false,
3028                },
3029                owner,
3030            )
3031            .await
3032            .expect("authorized delete should succeed");
3033
3034        assert_eq!(result.deleted_datasets, 1);
3035        assert_eq!(result.deleted_data, 1);
3036    }
3037
3038    #[tokio::test]
3039    async fn authorized_delete_fails_without_permission() {
3040        use cognee_database::AclDb;
3041        let (svc, storage, db, acl) = make_authorized_service().await;
3042        let owner = Uuid::new_v4();
3043        seed_dataset_with_data(&db, &storage, owner, "acl_fail_ds").await;
3044
3045        // Do NOT grant any permissions.
3046        // Ensure the principal exists but without delete permission.
3047        let acl_dyn: &dyn AclDb = acl.as_ref();
3048        acl_dyn.ensure_principal(owner, "user").await.unwrap();
3049
3050        let err = svc
3051            .execute(
3052                &DeleteRequest {
3053                    scope: DeleteScope::Dataset {
3054                        owner_id: owner,
3055                        dataset_name: "acl_fail_ds".to_string(),
3056                    },
3057                    mode: DeleteMode::Soft,
3058                    memory_only: false,
3059                },
3060                owner,
3061            )
3062            .await
3063            .expect_err("should fail without delete permission");
3064
3065        assert!(
3066            matches!(err, DeleteError::PermissionDenied(_)),
3067            "expected PermissionDenied, got: {err:?}"
3068        );
3069    }
3070
3071    #[tokio::test]
3072    async fn authorized_delete_with_wrong_principal_fails() {
3073        use cognee_database::AclDb;
3074        let (svc, storage, db, acl) = make_authorized_service().await;
3075        let owner_a = Uuid::new_v4();
3076        let owner_b = Uuid::new_v4();
3077        let (dataset_id, _data_id) =
3078            seed_dataset_with_data(&db, &storage, owner_a, "acl_wrong_principal").await;
3079
3080        // Grant permissions to owner_a only
3081        mock_grant_all_perms(&acl, owner_a, dataset_id).await;
3082        // Ensure owner_b exists as principal but has no permissions
3083        let acl_dyn: &dyn AclDb = acl.as_ref();
3084        acl_dyn.ensure_principal(owner_b, "user").await.unwrap();
3085
3086        let err = svc
3087            .execute(
3088                &DeleteRequest {
3089                    scope: DeleteScope::Dataset {
3090                        owner_id: owner_a,
3091                        dataset_name: "acl_wrong_principal".to_string(),
3092                    },
3093                    mode: DeleteMode::Soft,
3094                    memory_only: false,
3095                },
3096                owner_b, // wrong principal
3097            )
3098            .await
3099            .expect_err("should fail for wrong principal");
3100
3101        assert!(
3102            matches!(err, DeleteError::PermissionDenied(_)),
3103            "expected PermissionDenied for wrong principal, got: {err:?}"
3104        );
3105    }
3106
3107    #[tokio::test]
3108    async fn authorized_delete_after_permission_grant() {
3109        use cognee_database::AclDb;
3110        let (svc, storage, db, acl) = make_authorized_service().await;
3111        let owner_a = Uuid::new_v4();
3112        let user_b = Uuid::new_v4();
3113        let (dataset_id, _data_id) =
3114            seed_dataset_with_data(&db, &storage, owner_a, "acl_delegated").await;
3115
3116        // Owner A gets all permissions
3117        mock_grant_all_perms(&acl, owner_a, dataset_id).await;
3118
3119        // Grant "delete" to user B (delegated access)
3120        let acl_dyn: &dyn AclDb = acl.as_ref();
3121        acl_dyn.ensure_principal(user_b, "user").await.unwrap();
3122        acl_dyn
3123            .grant_permission(user_b, dataset_id, "delete")
3124            .await
3125            .unwrap();
3126
3127        // User B should now be able to delete
3128        let result = svc
3129            .execute(
3130                &DeleteRequest {
3131                    scope: DeleteScope::Dataset {
3132                        owner_id: owner_a,
3133                        dataset_name: "acl_delegated".to_string(),
3134                    },
3135                    mode: DeleteMode::Soft,
3136                    memory_only: false,
3137                },
3138                user_b,
3139            )
3140            .await
3141            .expect("delegated delete should succeed");
3142
3143        assert_eq!(result.deleted_datasets, 1);
3144    }
3145
3146    #[tokio::test]
3147    async fn delete_cascades_acl_entries() {
3148        // This test used to verify FK CASCADE on `acls.dataset_id` through
3149        // the real `ops::acl::*` standalone functions. With the `acls` table
3150        // moved to the closed `cognee-access-control` migration,
3151        // OSS cannot exercise the production CASCADE — that is now covered
3152        // by integration tests in the closed crate. To keep the OSS test
3153        // surface meaningful we drive the in-memory `MockAclDb` and verify
3154        // the grant + delete-dataset interaction at the trait level: the
3155        // grant exists before and is unaffected by deleting the OSS
3156        // `datasets` row (the mock has no FK cascade).
3157        //
3158        // TODO: replicate FK CASCADE verification in the closed
3159        // `cognee-access-control` integration tests.
3160        use cognee_database::AclDb;
3161        let db = connect("sqlite::memory:").await.unwrap();
3162        initialize(&db).await.unwrap();
3163        let owner = Uuid::new_v4();
3164        let storage = MockStorage::new();
3165        let acl = Arc::new(cognee_test_utils::MockAclDb::new());
3166
3167        let (dataset_id, _data_id) =
3168            seed_dataset_with_data(&db, &storage, owner, "cascade_ds").await;
3169        mock_grant_all_perms(&acl, owner, dataset_id).await;
3170
3171        // Verify ACLs exist via the mock.
3172        let acl_dyn: &dyn AclDb = acl.as_ref();
3173        let has_delete = acl_dyn
3174            .has_permission(owner, dataset_id, "delete")
3175            .await
3176            .unwrap();
3177        assert!(has_delete, "should have delete permission before cascade");
3178
3179        // Delete the dataset directly (bypasses DeleteService).
3180        ops::datasets::delete_dataset(&db, dataset_id)
3181            .await
3182            .unwrap();
3183
3184        // The mock has no FK cascade — the grant is still present. The
3185        // production CASCADE is exercised in the closed crate's tests.
3186        let has_delete_after = acl_dyn
3187            .has_permission(owner, dataset_id, "delete")
3188            .await
3189            .unwrap();
3190        assert!(
3191            has_delete_after,
3192            "MockAclDb does not cascade — production CASCADE coverage moved to \
3193             cognee-access-control integration tests."
3194        );
3195    }
3196
3197    #[tokio::test]
3198    async fn unauthorized_service_still_works() {
3199        // The plain DeleteService (without ACL wrapper) should continue
3200        // to work based on owner_id matching only.
3201        let (svc, storage, db) = make_service().await;
3202        let owner = Uuid::new_v4();
3203        seed_dataset_with_data(&db, &storage, owner, "no_acl_ds").await;
3204
3205        // No ACL grants needed — plain service doesn't check them
3206        let result = svc
3207            .execute(&DeleteRequest {
3208                scope: DeleteScope::Dataset {
3209                    owner_id: owner,
3210                    dataset_name: "no_acl_ds".to_string(),
3211                },
3212                mode: DeleteMode::Soft,
3213                memory_only: false,
3214            })
3215            .await
3216            .expect("plain service should succeed without ACL");
3217
3218        assert_eq!(result.deleted_datasets, 1);
3219        assert_eq!(result.deleted_data, 1);
3220    }
3221
3222    #[tokio::test]
3223    async fn authorized_preview_checks_acl() {
3224        use cognee_database::AclDb;
3225        let (svc, storage, db, acl) = make_authorized_service().await;
3226        let owner = Uuid::new_v4();
3227        seed_dataset_with_data(&db, &storage, owner, "preview_acl_ds").await;
3228
3229        // Without permission, even preview should fail
3230        let acl_dyn: &dyn AclDb = acl.as_ref();
3231        acl_dyn.ensure_principal(owner, "user").await.unwrap();
3232
3233        let err = svc
3234            .preview(
3235                &DeleteRequest {
3236                    scope: DeleteScope::Dataset {
3237                        owner_id: owner,
3238                        dataset_name: "preview_acl_ds".to_string(),
3239                    },
3240                    mode: DeleteMode::Soft,
3241                    memory_only: false,
3242                },
3243                owner,
3244            )
3245            .await
3246            .expect_err("preview should fail without permission");
3247
3248        assert!(
3249            matches!(err, DeleteError::PermissionDenied(_)),
3250            "expected PermissionDenied on preview, got: {err:?}"
3251        );
3252    }
3253
3254    // ------------------------------------------------------------------
3255    // Hard delete mode tests
3256    // ------------------------------------------------------------------
3257
3258    #[tokio::test]
3259    async fn hard_delete_removes_degree_one_entities() {
3260        let (svc, storage, db, graph_db, _vector_db) = make_service_with_graph_vector().await;
3261        let owner = Uuid::new_v4();
3262        let (dataset_id, _data_id) =
3263            seed_dataset_with_data(&db, &storage, owner, "hard_del_ds").await;
3264
3265        // Add an Entity node with degree 1 (one edge to a type node)
3266        graph_db
3267            .add_node_raw(serde_json::json!({
3268                "id": "entity-orphan",
3269                "type": "Entity",
3270                "name": "OrphanEntity"
3271            }))
3272            .await
3273            .unwrap();
3274        graph_db
3275            .add_node_raw(serde_json::json!({
3276                "id": "type-node",
3277                "type": "EntityType",
3278                "name": "Person"
3279            }))
3280            .await
3281            .unwrap();
3282        // Single edge: entity-orphan -> type-node
3283        // entity-orphan has degree 1, type-node has degree 1
3284        graph_db
3285            .add_edge("entity-orphan", "type-node", "is_a", None)
3286            .await
3287            .unwrap();
3288
3289        assert_eq!(graph_db.node_count(), 2);
3290
3291        // Seed provenance so cleanup_dataset has something to work with
3292        let entity_slug = Uuid::new_v4();
3293        seed_provenance_nodes(
3294            &db,
3295            dataset_id,
3296            _data_id,
3297            owner,
3298            &[entity_slug],
3299            "Entity",
3300            serde_json::json!(["name"]),
3301        )
3302        .await;
3303
3304        let result = svc
3305            .execute(&DeleteRequest {
3306                scope: DeleteScope::Dataset {
3307                    owner_id: owner,
3308                    dataset_name: "hard_del_ds".to_string(),
3309                },
3310                mode: DeleteMode::Hard,
3311                memory_only: false,
3312            })
3313            .await
3314            .expect("hard delete should succeed");
3315
3316        assert_eq!(result.deleted_datasets, 1);
3317        // After dataset cleanup removes tracked nodes, the orphan sweep finds
3318        // degree-one nodes. Since both nodes started with degree 1, both should
3319        // be swept (after normal cleanup may have already removed tracked ones,
3320        // the sweep catches any remaining degree-one nodes).
3321        // The exact counts depend on whether cleanup_dataset already removed
3322        // entity-orphan via provenance. The important thing is that the graph
3323        // ends up clean.
3324        let remaining_nodes = graph_db.node_count();
3325        assert_eq!(
3326            remaining_nodes, 0,
3327            "all orphan nodes should be removed after hard delete"
3328        );
3329    }
3330
3331    #[tokio::test]
3332    async fn soft_delete_does_not_sweep_orphans() {
3333        let (svc, storage, db, graph_db, _vector_db) = make_service_with_graph_vector().await;
3334        let owner = Uuid::new_v4();
3335        seed_dataset_with_data(&db, &storage, owner, "soft_del_ds").await;
3336
3337        // Add orphan entity (degree 1)
3338        graph_db
3339            .add_node_raw(serde_json::json!({
3340                "id": "orphan-soft",
3341                "type": "Entity",
3342                "name": "SoftOrphan"
3343            }))
3344            .await
3345            .unwrap();
3346        graph_db
3347            .add_node_raw(serde_json::json!({
3348                "id": "type-soft",
3349                "type": "EntityType",
3350                "name": "Thing"
3351            }))
3352            .await
3353            .unwrap();
3354        graph_db
3355            .add_edge("orphan-soft", "type-soft", "is_a", None)
3356            .await
3357            .unwrap();
3358
3359        let result = svc
3360            .execute(&DeleteRequest {
3361                scope: DeleteScope::Dataset {
3362                    owner_id: owner,
3363                    dataset_name: "soft_del_ds".to_string(),
3364                },
3365                mode: DeleteMode::Soft,
3366                memory_only: false,
3367            })
3368            .await
3369            .expect("soft delete should succeed");
3370
3371        // Soft delete should NOT sweep orphans
3372        assert_eq!(result.deleted_orphan_entities, 0);
3373        assert_eq!(result.deleted_orphan_entity_types, 0);
3374        // Orphan nodes should still exist
3375        assert_eq!(
3376            graph_db.node_count(),
3377            2,
3378            "orphan nodes should survive soft delete"
3379        );
3380    }
3381
3382    #[tokio::test]
3383    async fn hard_delete_preserves_well_connected_entities() {
3384        let (svc, storage, db, graph_db, _vector_db) = make_service_with_graph_vector().await;
3385        let owner = Uuid::new_v4();
3386        seed_dataset_with_data(&db, &storage, owner, "hard_preserve_ds").await;
3387
3388        // Add a well-connected Entity (degree 3)
3389        graph_db
3390            .add_node_raw(serde_json::json!({
3391                "id": "connected-entity",
3392                "type": "Entity",
3393                "name": "WellConnected"
3394            }))
3395            .await
3396            .unwrap();
3397        graph_db
3398            .add_node_raw(serde_json::json!({
3399                "id": "neighbor-1",
3400                "type": "DocumentChunk",
3401                "text": "chunk1"
3402            }))
3403            .await
3404            .unwrap();
3405        graph_db
3406            .add_node_raw(serde_json::json!({
3407                "id": "neighbor-2",
3408                "type": "DocumentChunk",
3409                "text": "chunk2"
3410            }))
3411            .await
3412            .unwrap();
3413        graph_db
3414            .add_node_raw(serde_json::json!({
3415                "id": "type-node",
3416                "type": "EntityType",
3417                "name": "Person"
3418            }))
3419            .await
3420            .unwrap();
3421
3422        // connected-entity has 3 edges -> degree 3
3423        graph_db
3424            .add_edge("neighbor-1", "connected-entity", "contains", None)
3425            .await
3426            .unwrap();
3427        graph_db
3428            .add_edge("neighbor-2", "connected-entity", "contains", None)
3429            .await
3430            .unwrap();
3431        graph_db
3432            .add_edge("connected-entity", "type-node", "is_a", None)
3433            .await
3434            .unwrap();
3435
3436        let result = svc
3437            .execute(&DeleteRequest {
3438                scope: DeleteScope::Dataset {
3439                    owner_id: owner,
3440                    dataset_name: "hard_preserve_ds".to_string(),
3441                },
3442                mode: DeleteMode::Hard,
3443                memory_only: false,
3444            })
3445            .await
3446            .expect("hard delete should succeed");
3447
3448        // The well-connected entity (degree 3) should survive
3449        assert!(
3450            graph_db.has_node("connected-entity").await.unwrap(),
3451            "well-connected entity should survive hard delete"
3452        );
3453        // No orphan entities should have been swept
3454        assert_eq!(result.deleted_orphan_entities, 0);
3455    }
3456
3457    // ------------------------------------------------------------------
3458    // Pipeline runs / pipeline_status cleanup tests
3459    // ------------------------------------------------------------------
3460
3461    #[tokio::test]
3462    async fn test_dataset_deletion_clears_pipeline_status() {
3463        let (svc, storage, db) = make_service().await;
3464        let owner = Uuid::new_v4();
3465        let (dataset_id, data_id) =
3466            seed_dataset_with_data(&db, &storage, owner, "ps_clear_ds").await;
3467
3468        // Set pipeline_status JSON on the data record with an entry for this dataset
3469        let dataset_id_hex = cognee_database::uuid_hex::to_hex(dataset_id);
3470        let status_json = serde_json::json!({
3471            "cognify_pipeline": {
3472                dataset_id_hex: "DATA_ITEM_PROCESSING_COMPLETED"
3473            }
3474        });
3475        let data = ops::data::get_data(&db, data_id).await.unwrap().unwrap();
3476        let updated_data = Data {
3477            pipeline_status: Some(status_json.to_string()),
3478            ..data
3479        };
3480        ops::data::update_data(&db, updated_data).await.unwrap();
3481
3482        // Verify pipeline_status is set
3483        let data_before = ops::data::get_data(&db, data_id).await.unwrap().unwrap();
3484        assert!(
3485            data_before.pipeline_status.is_some(),
3486            "pipeline_status should be set before deletion"
3487        );
3488
3489        // Delete the dataset
3490        let result = svc
3491            .execute(&DeleteRequest {
3492                scope: DeleteScope::Dataset {
3493                    owner_id: owner,
3494                    dataset_name: "ps_clear_ds".to_string(),
3495                },
3496                mode: DeleteMode::Soft,
3497                memory_only: false,
3498            })
3499            .await
3500            .expect("execute should succeed");
3501
3502        assert_eq!(result.deleted_datasets, 1);
3503        assert_eq!(result.cleared_pipeline_statuses, 1);
3504
3505        // Data record should still exist (was deleted because only link was
3506        // to the deleted dataset). Let us verify by creating a scenario where
3507        // the data survives deletion.
3508    }
3509
3510    #[tokio::test]
3511    async fn test_dataset_deletion_clears_pipeline_status_data_survives() {
3512        let (svc, storage, db) = make_service().await;
3513        let owner = Uuid::new_v4();
3514
3515        // Create two datasets sharing one data item
3516        let ds1 = Dataset::new("ps_ds1".to_string(), owner, None, Uuid::new_v4());
3517        let ds2 = Dataset::new("ps_ds2".to_string(), owner, None, Uuid::new_v4());
3518        let ds1_id = ds1.id;
3519        let ds2_id = ds2.id;
3520        ops::datasets::create_dataset(&db, ds1).await.unwrap();
3521        ops::datasets::create_dataset(&db, ds2).await.unwrap();
3522
3523        let location = storage
3524            .store(b"shared content", "shared.txt")
3525            .await
3526            .unwrap();
3527        let data_id = Uuid::new_v4();
3528        let data = Data::builder(
3529            data_id,
3530            "shared.txt",
3531            location,
3532            "file://shared.txt",
3533            "txt",
3534            "text/plain",
3535            "shared_hash",
3536            owner,
3537        )
3538        .build();
3539        ops::data::create_data(&db, data).await.unwrap();
3540        ops::datasets::attach_data_to_dataset(&db, ds1_id, data_id)
3541            .await
3542            .unwrap();
3543        ops::datasets::attach_data_to_dataset(&db, ds2_id, data_id)
3544            .await
3545            .unwrap();
3546
3547        // Set pipeline_status with entries for both datasets
3548        let ds1_hex = cognee_database::uuid_hex::to_hex(ds1_id);
3549        let ds2_hex = cognee_database::uuid_hex::to_hex(ds2_id);
3550        let status_json = serde_json::json!({
3551            "cognify_pipeline": {
3552                ds1_hex.clone(): "DATA_ITEM_PROCESSING_COMPLETED",
3553                ds2_hex.clone(): "DATA_ITEM_PROCESSING_COMPLETED"
3554            }
3555        });
3556        let data_record = ops::data::get_data(&db, data_id).await.unwrap().unwrap();
3557        let updated_data = Data {
3558            pipeline_status: Some(status_json.to_string()),
3559            ..data_record
3560        };
3561        ops::data::update_data(&db, updated_data).await.unwrap();
3562
3563        // Delete dataset 1 only
3564        let result = svc
3565            .execute(&DeleteRequest {
3566                scope: DeleteScope::Dataset {
3567                    owner_id: owner,
3568                    dataset_name: "ps_ds1".to_string(),
3569                },
3570                mode: DeleteMode::Soft,
3571                memory_only: false,
3572            })
3573            .await
3574            .expect("execute should succeed");
3575
3576        assert_eq!(result.deleted_datasets, 1);
3577        assert_eq!(
3578            result.deleted_data, 0,
3579            "data should survive because it's still linked to ds2"
3580        );
3581        assert_eq!(result.cleared_pipeline_statuses, 1);
3582
3583        // Verify pipeline_status: ds1 entry should be removed, ds2 entry should remain
3584        let data_after = ops::data::get_data(&db, data_id).await.unwrap().unwrap();
3585        let status_after: serde_json::Value =
3586            serde_json::from_str(data_after.pipeline_status.as_deref().unwrap_or("{}")).unwrap();
3587        let cognify_obj = status_after
3588            .get("cognify_pipeline")
3589            .and_then(|v| v.as_object())
3590            .expect("cognify_pipeline should still exist");
3591
3592        assert!(
3593            !cognify_obj.contains_key(&ds1_hex),
3594            "ds1 entry should be removed from pipeline_status"
3595        );
3596        assert!(
3597            cognify_obj.contains_key(&ds2_hex),
3598            "ds2 entry should remain in pipeline_status"
3599        );
3600    }
3601
3602    #[tokio::test]
3603    async fn test_data_deletion_invalidates_pipeline_cache() {
3604        let (svc, storage, db) = make_service().await;
3605        let owner = Uuid::new_v4();
3606        let (dataset_id, data_id) =
3607            seed_dataset_with_data(&db, &storage, owner, "pr_invalidate_ds").await;
3608
3609        // Insert a pipeline_runs row for this dataset
3610        let pipeline_run = cognee_database::PipelineRun {
3611            id: Uuid::new_v4(),
3612            created_at: chrono::Utc::now(),
3613            status: cognee_database::PipelineRunStatus::Completed,
3614            pipeline_run_id: Uuid::new_v4(),
3615            pipeline_name: "cognify_pipeline".to_string(),
3616            pipeline_id: Uuid::new_v4(),
3617            dataset_id: Some(dataset_id),
3618            run_info: None,
3619        };
3620        ops::pipeline_runs::create_pipeline_run(&db, pipeline_run)
3621            .await
3622            .unwrap();
3623
3624        // Verify pipeline_run exists
3625        let status_before =
3626            ops::pipeline_runs::get_latest_pipeline_status(&db, "cognify_pipeline", dataset_id)
3627                .await
3628                .unwrap();
3629        assert!(
3630            status_before.is_some(),
3631            "pipeline run should exist before data deletion"
3632        );
3633
3634        // Delete the data item (data-scoped)
3635        let result = svc
3636            .execute(&DeleteRequest {
3637                scope: DeleteScope::Data {
3638                    owner_id: owner,
3639                    data_id,
3640                    dataset_name: Some("pr_invalidate_ds".to_string()),
3641                    delete_dataset_if_empty: false,
3642                },
3643                mode: DeleteMode::Soft,
3644                memory_only: false,
3645            })
3646            .await
3647            .expect("execute should succeed");
3648
3649        assert_eq!(result.deleted_data, 1);
3650        assert_eq!(
3651            result.deleted_pipeline_runs, 1,
3652            "pipeline_runs row should be deleted"
3653        );
3654
3655        // Verify pipeline_run is gone
3656        let status_after =
3657            ops::pipeline_runs::get_latest_pipeline_status(&db, "cognify_pipeline", dataset_id)
3658                .await
3659                .unwrap();
3660        assert!(
3661            status_after.is_none(),
3662            "pipeline run should be invalidated after data deletion"
3663        );
3664    }
3665
3666    #[tokio::test]
3667    async fn test_dataset_deletion_preserves_pipeline_runs() {
3668        // Post-08-01: the `pipeline_runs.dataset_id` FK CASCADE has been
3669        // dropped (Python parity — Python's `pipeline_runs.dataset_id` is a
3670        // plain nullable column with no FK). The audit-trail row therefore
3671        // survives a dataset deletion; the orphaned row simply retains its
3672        // historical `dataset_id` value pointing at a now-deleted dataset.
3673        let (svc, storage, db) = make_service().await;
3674        let owner = Uuid::new_v4();
3675        let (dataset_id, _data_id) =
3676            seed_dataset_with_data(&db, &storage, owner, "pr_cascade_ds").await;
3677
3678        // Insert a pipeline_runs row for this dataset
3679        let pipeline_run = cognee_database::PipelineRun {
3680            id: Uuid::new_v4(),
3681            created_at: chrono::Utc::now(),
3682            status: cognee_database::PipelineRunStatus::Completed,
3683            pipeline_run_id: Uuid::new_v4(),
3684            pipeline_name: "cognify_pipeline".to_string(),
3685            pipeline_id: Uuid::new_v4(),
3686            dataset_id: Some(dataset_id),
3687            run_info: None,
3688        };
3689        ops::pipeline_runs::create_pipeline_run(&db, pipeline_run)
3690            .await
3691            .unwrap();
3692
3693        // Verify pipeline_run exists
3694        let status_before =
3695            ops::pipeline_runs::get_latest_pipeline_status(&db, "cognify_pipeline", dataset_id)
3696                .await
3697                .unwrap();
3698        assert!(
3699            status_before.is_some(),
3700            "pipeline run should exist before dataset deletion"
3701        );
3702
3703        // Delete the dataset
3704        let result = svc
3705            .execute(&DeleteRequest {
3706                scope: DeleteScope::Dataset {
3707                    owner_id: owner,
3708                    dataset_name: "pr_cascade_ds".to_string(),
3709                },
3710                mode: DeleteMode::Soft,
3711                memory_only: false,
3712            })
3713            .await
3714            .expect("execute should succeed");
3715
3716        assert_eq!(result.deleted_datasets, 1);
3717
3718        // Post-08-01: pipeline_runs are NOT cascade-deleted (the FK is gone).
3719        // The audit row survives, still keyed by the now-orphaned dataset_id.
3720        let status_after =
3721            ops::pipeline_runs::get_latest_pipeline_status(&db, "cognify_pipeline", dataset_id)
3722                .await
3723                .unwrap();
3724        assert!(
3725            status_after.is_some(),
3726            "pipeline run should survive dataset deletion (no FK CASCADE post-08-01)"
3727        );
3728    }
3729
3730    // ------------------------------------------------------------------
3731    // Search history cleanup tests
3732    // ------------------------------------------------------------------
3733
3734    /// Helper: seed search history queries (and one result per query) for a user.
3735    async fn seed_search_history(
3736        db: &cognee_database::DatabaseConnection,
3737        user_id: Uuid,
3738        count: usize,
3739    ) {
3740        for i in 0..count {
3741            let query_id = ops::search_history::log_query(
3742                db,
3743                &format!("test query {i}"),
3744                "GraphCompletion",
3745                Some(user_id),
3746            )
3747            .await
3748            .unwrap();
3749            ops::search_history::log_result(
3750                db,
3751                query_id,
3752                &format!("{{\"result\": {i}}}"),
3753                Some(user_id),
3754            )
3755            .await
3756            .unwrap();
3757        }
3758    }
3759
3760    #[tokio::test]
3761    async fn user_scoped_delete_clears_search_history() {
3762        let (svc, storage, db) = make_service().await;
3763        let user_a = Uuid::new_v4();
3764        let user_b = Uuid::new_v4();
3765
3766        // Seed datasets so the user-scoped delete has something to resolve
3767        seed_dataset_with_data(&db, &storage, user_a, "sh_user_a_ds").await;
3768        seed_dataset_with_data(&db, &storage, user_b, "sh_user_b_ds").await;
3769
3770        // Seed search history for both users
3771        seed_search_history(&db, user_a, 3).await;
3772        seed_search_history(&db, user_b, 2).await;
3773
3774        // Verify initial counts
3775        let count_a = ops::search_history::count_queries_by_user(&db, user_a)
3776            .await
3777            .unwrap();
3778        let count_b = ops::search_history::count_queries_by_user(&db, user_b)
3779            .await
3780            .unwrap();
3781        assert_eq!(count_a, 3);
3782        assert_eq!(count_b, 2);
3783
3784        // Delete user A
3785        let result = svc
3786            .execute(&DeleteRequest {
3787                scope: DeleteScope::User { owner_id: user_a },
3788                mode: DeleteMode::Soft,
3789                memory_only: false,
3790            })
3791            .await
3792            .expect("execute should succeed");
3793
3794        assert_eq!(result.deleted_search_queries, 3);
3795
3796        // User A's search history should be gone
3797        let count_a_after = ops::search_history::count_queries_by_user(&db, user_a)
3798            .await
3799            .unwrap();
3800        assert_eq!(
3801            count_a_after, 0,
3802            "user A's search history should be deleted"
3803        );
3804
3805        // User B's search history should remain
3806        let count_b_after = ops::search_history::count_queries_by_user(&db, user_b)
3807            .await
3808            .unwrap();
3809        assert_eq!(
3810            count_b_after, 2,
3811            "user B's search history should be untouched"
3812        );
3813    }
3814
3815    #[tokio::test]
3816    async fn all_scoped_delete_clears_all_search_history() {
3817        let (svc, storage, db) = make_service().await;
3818        let user_a = Uuid::new_v4();
3819        let user_b = Uuid::new_v4();
3820
3821        seed_dataset_with_data(&db, &storage, user_a, "sh_all_a_ds").await;
3822        seed_dataset_with_data(&db, &storage, user_b, "sh_all_b_ds").await;
3823
3824        seed_search_history(&db, user_a, 3).await;
3825        seed_search_history(&db, user_b, 2).await;
3826
3827        let total_before = ops::search_history::count_all_queries(&db).await.unwrap();
3828        assert_eq!(total_before, 5);
3829
3830        let result = svc
3831            .execute(&DeleteRequest {
3832                scope: DeleteScope::All,
3833                mode: DeleteMode::Soft,
3834                memory_only: false,
3835            })
3836            .await
3837            .expect("execute should succeed");
3838
3839        assert_eq!(result.deleted_search_queries, 5);
3840
3841        let total_after = ops::search_history::count_all_queries(&db).await.unwrap();
3842        assert_eq!(
3843            total_after, 0,
3844            "all search history should be deleted after All-scoped delete"
3845        );
3846    }
3847
3848    #[tokio::test]
3849    async fn dataset_scoped_delete_does_not_touch_search_history() {
3850        let (svc, storage, db) = make_service().await;
3851        let owner = Uuid::new_v4();
3852
3853        seed_dataset_with_data(&db, &storage, owner, "sh_ds_notouch").await;
3854        seed_search_history(&db, owner, 4).await;
3855
3856        let count_before = ops::search_history::count_queries_by_user(&db, owner)
3857            .await
3858            .unwrap();
3859        assert_eq!(count_before, 4);
3860
3861        let result = svc
3862            .execute(&DeleteRequest {
3863                scope: DeleteScope::Dataset {
3864                    owner_id: owner,
3865                    dataset_name: "sh_ds_notouch".to_string(),
3866                },
3867                mode: DeleteMode::Soft,
3868                memory_only: false,
3869            })
3870            .await
3871            .expect("execute should succeed");
3872
3873        assert_eq!(
3874            result.deleted_search_queries, 0,
3875            "dataset-scoped delete should not touch search history"
3876        );
3877
3878        let count_after = ops::search_history::count_queries_by_user(&db, owner)
3879            .await
3880            .unwrap();
3881        assert_eq!(
3882            count_after, 4,
3883            "search history should be untouched after dataset deletion"
3884        );
3885    }
3886
3887    #[tokio::test]
3888    async fn preview_shows_search_history_count() {
3889        let (svc, storage, db) = make_service().await;
3890        let owner = Uuid::new_v4();
3891
3892        seed_dataset_with_data(&db, &storage, owner, "sh_preview_ds").await;
3893        seed_search_history(&db, owner, 5).await;
3894
3895        // User-scoped preview
3896        let preview = svc
3897            .preview(&DeleteRequest {
3898                scope: DeleteScope::User { owner_id: owner },
3899                mode: DeleteMode::Soft,
3900                memory_only: false,
3901            })
3902            .await
3903            .expect("preview should succeed");
3904
3905        assert_eq!(
3906            preview.search_queries_to_delete, 5,
3907            "preview should show correct search query count for user scope"
3908        );
3909
3910        // All-scoped preview
3911        let preview_all = svc
3912            .preview(&DeleteRequest {
3913                scope: DeleteScope::All,
3914                mode: DeleteMode::Soft,
3915                memory_only: false,
3916            })
3917            .await
3918            .expect("preview should succeed");
3919
3920        assert_eq!(
3921            preview_all.search_queries_to_delete, 5,
3922            "preview should show correct search query count for All scope"
3923        );
3924
3925        // Dataset-scoped preview should show 0
3926        let preview_ds = svc
3927            .preview(&DeleteRequest {
3928                scope: DeleteScope::Dataset {
3929                    owner_id: owner,
3930                    dataset_name: "sh_preview_ds".to_string(),
3931                },
3932                mode: DeleteMode::Soft,
3933                memory_only: false,
3934            })
3935            .await
3936            .expect("preview should succeed");
3937
3938        assert_eq!(
3939            preview_ds.search_queries_to_delete, 0,
3940            "dataset-scoped preview should show 0 search queries"
3941        );
3942    }
3943
3944    // ------------------------------------------------------------------
3945    // Session prune tests
3946    // ------------------------------------------------------------------
3947
3948    /// Minimal mock session store that tracks whether `prune()` was called.
3949    struct MockSessionStore {
3950        pruned: std::sync::Mutex<bool>,
3951    }
3952
3953    impl MockSessionStore {
3954        fn new() -> Self {
3955            Self {
3956                pruned: std::sync::Mutex::new(false),
3957            }
3958        }
3959
3960        fn was_pruned(&self) -> bool {
3961            *self.pruned.lock().expect("lock poison is unrecoverable")
3962        }
3963    }
3964
3965    #[async_trait::async_trait]
3966    impl SessionStore for MockSessionStore {
3967        async fn create_qa_entry(
3968            &self,
3969            _session_id: &str,
3970            _user_id: Option<&str>,
3971            _question: &str,
3972            _answer: &str,
3973            _context: Option<&str>,
3974        ) -> Result<String, cognee_session::SessionError> {
3975            Ok("mock-qa-id".to_string())
3976        }
3977
3978        async fn get_latest_qa_entries(
3979            &self,
3980            _session_id: &str,
3981            _user_id: Option<&str>,
3982            _last_n: usize,
3983        ) -> Result<Vec<cognee_session::SessionQAEntry>, cognee_session::SessionError> {
3984            Ok(vec![])
3985        }
3986
3987        async fn get_all_qa_entries(
3988            &self,
3989            _session_id: &str,
3990            _user_id: Option<&str>,
3991        ) -> Result<Vec<cognee_session::SessionQAEntry>, cognee_session::SessionError> {
3992            Ok(vec![])
3993        }
3994
3995        async fn delete_session(
3996            &self,
3997            _session_id: &str,
3998            _user_id: Option<&str>,
3999        ) -> Result<bool, cognee_session::SessionError> {
4000            Ok(true)
4001        }
4002
4003        async fn delete_qa_entry(
4004            &self,
4005            _session_id: &str,
4006            _user_id: Option<&str>,
4007            _qa_id: &str,
4008        ) -> Result<bool, cognee_session::SessionError> {
4009            Ok(true)
4010        }
4011
4012        async fn prune(&self) -> Result<(), cognee_session::SessionError> {
4013            *self.pruned.lock().expect("lock poison is unrecoverable") = true;
4014            Ok(())
4015        }
4016
4017        async fn update_qa_entry(
4018            &self,
4019            _session_id: &str,
4020            _user_id: Option<&str>,
4021            _qa_id: &str,
4022            _updates: cognee_session::SessionQAUpdate,
4023        ) -> Result<bool, cognee_session::SessionError> {
4024            Ok(true)
4025        }
4026
4027        async fn get_graph_context(
4028            &self,
4029            _session_id: &str,
4030            _user_id: Option<&str>,
4031        ) -> Result<Option<String>, cognee_session::SessionError> {
4032            Ok(None)
4033        }
4034
4035        async fn set_graph_context(
4036            &self,
4037            _session_id: &str,
4038            _user_id: Option<&str>,
4039            _context: &str,
4040        ) -> Result<(), cognee_session::SessionError> {
4041            Ok(())
4042        }
4043    }
4044
4045    #[tokio::test]
4046    async fn delete_all_prunes_session_store() {
4047        let (svc, _storage, _db) = make_service().await;
4048        let session_store = Arc::new(MockSessionStore::new());
4049        let svc = svc.with_session_store(session_store.clone() as Arc<dyn SessionStore>);
4050
4051        let result = svc
4052            .execute(&DeleteRequest {
4053                scope: DeleteScope::All,
4054                mode: DeleteMode::Soft,
4055                memory_only: false,
4056            })
4057            .await
4058            .expect("delete all should succeed");
4059
4060        assert!(
4061            session_store.was_pruned(),
4062            "session store prune() should have been called"
4063        );
4064        assert!(
4065            result.pruned_sessions,
4066            "result should indicate sessions were pruned"
4067        );
4068    }
4069
4070    #[tokio::test]
4071    async fn delete_dataset_does_not_prune_sessions() {
4072        let (svc, storage, db) = make_service().await;
4073        let session_store = Arc::new(MockSessionStore::new());
4074        let owner_id = Uuid::new_v4();
4075        let _ = seed_dataset_with_data(&db, &storage, owner_id, "test_ds").await;
4076        let svc = svc.with_session_store(session_store.clone() as Arc<dyn SessionStore>);
4077
4078        let result = svc
4079            .execute(&DeleteRequest {
4080                scope: DeleteScope::Dataset {
4081                    owner_id,
4082                    dataset_name: "test_ds".to_string(),
4083                },
4084                mode: DeleteMode::Soft,
4085                memory_only: false,
4086            })
4087            .await
4088            .expect("delete dataset should succeed");
4089
4090        assert!(
4091            !session_store.was_pruned(),
4092            "session store prune() should NOT be called for dataset-scoped deletion"
4093        );
4094        assert!(
4095            !result.pruned_sessions,
4096            "result should indicate sessions were NOT pruned"
4097        );
4098    }
4099
4100    #[tokio::test]
4101    async fn delete_all_without_session_store_skips_prune() {
4102        let (svc, _storage, _db) = make_service().await;
4103        // No session store configured
4104
4105        let result = svc
4106            .execute(&DeleteRequest {
4107                scope: DeleteScope::All,
4108                mode: DeleteMode::Soft,
4109                memory_only: false,
4110            })
4111            .await
4112            .expect("delete all should succeed");
4113
4114        assert!(
4115            !result.pruned_sessions,
4116            "result should indicate sessions were NOT pruned when no store is configured"
4117        );
4118    }
4119
4120    // ------------------------------------------------------------------
4121    // Orphaned EdgeType cleanup tests (Gap 09)
4122    // ------------------------------------------------------------------
4123
4124    #[tokio::test]
4125    async fn hard_delete_removes_orphaned_edge_type_nodes() {
4126        let (_svc, storage, db, graph_db, vector_db) = make_service_with_graph_vector().await;
4127        let owner = Uuid::new_v4();
4128        let (dataset_id, data_id) =
4129            seed_dataset_with_data(&db, &storage, owner, "edge_type_ds").await;
4130
4131        // Seed provenance so graph/vector cleanup can find something
4132        let node_slug = Uuid::new_v4();
4133        let edge_slug = Uuid::new_v4();
4134        seed_provenance_nodes(
4135            &db,
4136            dataset_id,
4137            data_id,
4138            owner,
4139            &[node_slug],
4140            "Entity",
4141            serde_json::json!(["name"]),
4142        )
4143        .await;
4144        seed_provenance_edges(&db, dataset_id, data_id, owner, &[edge_slug], "works_at").await;
4145
4146        // Add entity node + EdgeType node to the graph
4147        let edge_type_id = cognee_models::EdgeType::deterministic_id("works_at");
4148        graph_db
4149            .add_node_raw(serde_json::json!({
4150                "id": node_slug.to_string(),
4151                "type": "Entity",
4152                "name": "Alice"
4153            }))
4154            .await
4155            .unwrap();
4156        graph_db
4157            .add_node_raw(serde_json::json!({
4158                "id": edge_type_id.to_string(),
4159                "type": "EdgeType",
4160                "relationship_name": "works_at"
4161            }))
4162            .await
4163            .unwrap();
4164
4165        // Add an edge between the entity and something (so the EdgeType is "in use")
4166        graph_db
4167            .add_edge(&node_slug.to_string(), "some_target", "works_at", None)
4168            .await
4169            .unwrap();
4170
4171        assert_eq!(graph_db.node_count(), 2);
4172        assert_eq!(graph_db.edge_count(), 1);
4173
4174        // Add EdgeType vector point
4175        vector_db
4176            .create_collection("EdgeType", "relationship_name", 3)
4177            .await
4178            .unwrap();
4179        let et_point = cognee_vector::VectorPoint::new(edge_type_id, vec![1.0, 0.0, 0.0]);
4180        vector_db
4181            .index_points("EdgeType", "relationship_name", &[et_point])
4182            .await
4183            .unwrap();
4184
4185        // Execute hard delete - this should:
4186        // 1. Delete the entity node from graph (via provenance)
4187        // 2. After entity deletion, the edge "works_at" is gone (MockGraphDB doesn't
4188        //    cascade edges on node delete, so we simulate that). Actually MockGraphDB
4189        //    delete_nodes only removes nodes, not edges. For this test, let's manually
4190        //    remove the edge to simulate what Ladybug would do.
4191        //
4192        // Actually, let's restructure: make the EdgeType orphaned from the start
4193        // by NOT having any edges with that relationship name in the graph.
4194        graph_db.clear();
4195
4196        // Re-add just the orphaned EdgeType node (no edges at all)
4197        graph_db
4198            .add_node_raw(serde_json::json!({
4199                "id": edge_type_id.to_string(),
4200                "type": "EdgeType",
4201                "relationship_name": "works_at"
4202            }))
4203            .await
4204            .unwrap();
4205        assert_eq!(graph_db.node_count(), 1);
4206        assert_eq!(graph_db.edge_count(), 0);
4207
4208        // Re-seed provenance with no nodes (graph was cleared)
4209        // We need the dataset to still exist for the delete to work.
4210        // Re-create it fresh.
4211        let db2 = connect("sqlite::memory:").await.unwrap();
4212        initialize(&db2).await.unwrap();
4213        let db2 = Arc::new(db2);
4214        let storage2 = Arc::new(MockStorage::new());
4215        let svc2 = DeleteService::new(
4216            storage2.clone() as Arc<dyn StorageTrait>,
4217            db2.clone() as Arc<dyn DeleteDb>,
4218        )
4219        .with_graph_db(graph_db.clone() as Arc<dyn GraphDBTrait>)
4220        .with_vector_db(vector_db.clone() as Arc<dyn VectorDB>);
4221
4222        let (dataset_id2, _data_id2) =
4223            seed_dataset_with_data(&db2, &storage2, owner, "edge_type_ds").await;
4224
4225        // Seed a provenance node so there's something to cleanup in phase 1
4226        let node_slug2 = Uuid::new_v4();
4227        seed_provenance_nodes(
4228            &db2,
4229            dataset_id2,
4230            _data_id2,
4231            owner,
4232            &[node_slug2],
4233            "Entity",
4234            serde_json::json!(["name"]),
4235        )
4236        .await;
4237
4238        // Add the provenance node to the graph too
4239        graph_db
4240            .add_node_raw(serde_json::json!({
4241                "id": node_slug2.to_string(),
4242                "type": "Entity",
4243                "name": "Bob"
4244            }))
4245            .await
4246            .unwrap();
4247
4248        // Now: graph has 2 nodes (Entity "Bob" + orphaned EdgeType "works_at"), 0 edges
4249        assert_eq!(graph_db.node_count(), 2);
4250
4251        // Execute hard delete
4252        let result = svc2
4253            .execute(&DeleteRequest {
4254                scope: DeleteScope::Dataset {
4255                    owner_id: owner,
4256                    dataset_name: "edge_type_ds".to_string(),
4257                },
4258                mode: DeleteMode::Hard,
4259                memory_only: false,
4260            })
4261            .await
4262            .expect("execute should succeed");
4263
4264        // The entity node was deleted via provenance. The EdgeType node should
4265        // be swept as an orphan because it has degree 0 and its relationship_name
4266        // is not in any edge.
4267        assert_eq!(
4268            result.deleted_orphan_edge_types, 1,
4269            "orphaned EdgeType should be cleaned up"
4270        );
4271        assert_eq!(
4272            graph_db.node_count(),
4273            0,
4274            "all nodes should be gone (entity via provenance, EdgeType via orphan sweep)"
4275        );
4276
4277        // Vector point should also be deleted
4278        assert_eq!(
4279            vector_db
4280                .collection_size("EdgeType", "relationship_name")
4281                .await
4282                .unwrap(),
4283            0,
4284            "EdgeType vector point should be removed by orphan sweep"
4285        );
4286    }
4287
4288    #[tokio::test]
4289    async fn shared_edge_type_survives_when_edges_remain() {
4290        let (_svc, _storage, _db, graph_db, vector_db) = make_service_with_graph_vector().await;
4291
4292        // Setup: EdgeType "works_at" with edges still present in the graph
4293        let edge_type_id = cognee_models::EdgeType::deterministic_id("works_at");
4294        graph_db
4295            .add_node_raw(serde_json::json!({
4296                "id": edge_type_id.to_string(),
4297                "type": "EdgeType",
4298                "relationship_name": "works_at"
4299            }))
4300            .await
4301            .unwrap();
4302        graph_db
4303            .add_node_raw(serde_json::json!({"id": "e1", "type": "Entity", "name": "Alice"}))
4304            .await
4305            .unwrap();
4306        graph_db
4307            .add_node_raw(serde_json::json!({"id": "e2", "type": "Entity", "name": "Bob"}))
4308            .await
4309            .unwrap();
4310        // An edge of type "works_at" still exists
4311        graph_db
4312            .add_edge("e1", "e2", "works_at", None)
4313            .await
4314            .unwrap();
4315
4316        // Create vector collection
4317        vector_db
4318            .create_collection("EdgeType", "relationship_name", 3)
4319            .await
4320            .unwrap();
4321        let et_point = cognee_vector::VectorPoint::new(edge_type_id, vec![1.0, 0.0, 0.0]);
4322        vector_db
4323            .index_points("EdgeType", "relationship_name", &[et_point])
4324            .await
4325            .unwrap();
4326
4327        // The zero-degree check should NOT find this EdgeType as orphaned
4328        let orphans = graph_db.get_zero_degree_edge_type_nodes().await.unwrap();
4329        assert!(
4330            orphans.is_empty(),
4331            "EdgeType with active edges should not be considered orphaned"
4332        );
4333
4334        // The EdgeType node should still exist
4335        assert!(
4336            graph_db.has_node(&edge_type_id.to_string()).await.unwrap(),
4337            "EdgeType node should survive"
4338        );
4339        assert_eq!(
4340            vector_db
4341                .collection_size("EdgeType", "relationship_name")
4342                .await
4343                .unwrap(),
4344            1,
4345            "EdgeType vector point should survive"
4346        );
4347    }
4348
4349    #[tokio::test]
4350    async fn orphan_edge_type_detected_when_no_edges_exist() {
4351        let (_svc, _storage, _db, graph_db, _vector_db) = make_service_with_graph_vector().await;
4352
4353        // Add an EdgeType node with NO corresponding edges
4354        let edge_type_id = cognee_models::EdgeType::deterministic_id("obsolete_rel");
4355        graph_db
4356            .add_node_raw(serde_json::json!({
4357                "id": edge_type_id.to_string(),
4358                "type": "EdgeType",
4359                "relationship_name": "obsolete_rel"
4360            }))
4361            .await
4362            .unwrap();
4363
4364        // Add a non-EdgeType node (should be ignored by the sweep)
4365        graph_db
4366            .add_node_raw(serde_json::json!({"id": "e1", "type": "Entity", "name": "Alice"}))
4367            .await
4368            .unwrap();
4369
4370        let orphans = graph_db.get_zero_degree_edge_type_nodes().await.unwrap();
4371        assert_eq!(
4372            orphans.len(),
4373            1,
4374            "should detect exactly one orphaned EdgeType"
4375        );
4376        assert_eq!(orphans[0].0, edge_type_id.to_string());
4377    }
4378
4379    #[tokio::test]
4380    async fn edge_type_with_matching_rel_name_in_edges_not_orphaned() {
4381        let (_svc, _storage, _db, graph_db, _vector_db) = make_service_with_graph_vector().await;
4382
4383        // Add an EdgeType node
4384        let edge_type_id = cognee_models::EdgeType::deterministic_id("knows");
4385        graph_db
4386            .add_node_raw(serde_json::json!({
4387                "id": edge_type_id.to_string(),
4388                "type": "EdgeType",
4389                "relationship_name": "knows"
4390            }))
4391            .await
4392            .unwrap();
4393
4394        // The EdgeType node itself has no edges (degree 0), but there are
4395        // other edges in the graph with relationship_name "knows"
4396        graph_db
4397            .add_node_raw(serde_json::json!({"id": "a", "type": "Entity", "name": "A"}))
4398            .await
4399            .unwrap();
4400        graph_db
4401            .add_node_raw(serde_json::json!({"id": "b", "type": "Entity", "name": "B"}))
4402            .await
4403            .unwrap();
4404        graph_db.add_edge("a", "b", "knows", None).await.unwrap();
4405
4406        let orphans = graph_db.get_zero_degree_edge_type_nodes().await.unwrap();
4407        assert!(
4408            orphans.is_empty(),
4409            "EdgeType should not be orphaned when edges with its relationship_name exist"
4410        );
4411    }
4412
4413    #[tokio::test]
4414    async fn soft_delete_does_not_sweep_orphan_edge_types() {
4415        let (svc, storage, db, graph_db, _vector_db) = make_service_with_graph_vector().await;
4416        let owner = Uuid::new_v4();
4417        let (_dataset_id, _data_id) = seed_dataset_with_data(&db, &storage, owner, "soft_ds").await;
4418
4419        // Add an orphaned EdgeType node
4420        let edge_type_id = cognee_models::EdgeType::deterministic_id("stale_rel");
4421        graph_db
4422            .add_node_raw(serde_json::json!({
4423                "id": edge_type_id.to_string(),
4424                "type": "EdgeType",
4425                "relationship_name": "stale_rel"
4426            }))
4427            .await
4428            .unwrap();
4429
4430        // Soft delete should NOT trigger orphan sweep
4431        let result = svc
4432            .execute(&DeleteRequest {
4433                scope: DeleteScope::Dataset {
4434                    owner_id: owner,
4435                    dataset_name: "soft_ds".to_string(),
4436                },
4437                mode: DeleteMode::Soft,
4438                memory_only: false,
4439            })
4440            .await
4441            .expect("execute should succeed");
4442
4443        assert_eq!(
4444            result.deleted_orphan_edge_types, 0,
4445            "soft delete should not sweep orphan EdgeTypes"
4446        );
4447        assert!(
4448            graph_db.has_node(&edge_type_id.to_string()).await.unwrap(),
4449            "orphaned EdgeType should still exist after soft delete"
4450        );
4451    }
4452
4453    // ------------------------------------------------------------------
4454    // delete_dataset_if_empty flag tests
4455    // ------------------------------------------------------------------
4456
4457    #[tokio::test]
4458    async fn delete_data_with_flag_deletes_empty_dataset() {
4459        let (svc, storage, db) = make_service().await;
4460        let owner = Uuid::new_v4();
4461        let (_dataset_id, data_id) =
4462            seed_dataset_with_data(&db, &storage, owner, "auto_del_ds").await;
4463
4464        let result = svc
4465            .execute(&DeleteRequest {
4466                scope: DeleteScope::Data {
4467                    owner_id: owner,
4468                    data_id,
4469                    dataset_name: Some("auto_del_ds".to_string()),
4470                    delete_dataset_if_empty: true,
4471                },
4472                mode: DeleteMode::Soft,
4473                memory_only: false,
4474            })
4475            .await
4476            .expect("execute should succeed");
4477
4478        assert_eq!(result.deleted_data, 1, "data should be deleted");
4479        assert_eq!(
4480            result.deleted_datasets, 1,
4481            "dataset should be auto-deleted because it became empty"
4482        );
4483
4484        // Verify the dataset is actually gone from the DB
4485        let ds = ops::datasets::get_dataset_by_name(&db, "auto_del_ds", owner, None)
4486            .await
4487            .unwrap();
4488        assert!(ds.is_none(), "dataset should be gone from DB");
4489    }
4490
4491    #[tokio::test]
4492    async fn delete_data_with_flag_keeps_nonempty_dataset() {
4493        let (svc, storage, db) = make_service().await;
4494        let owner = Uuid::new_v4();
4495
4496        // Create one dataset with two data items
4497        let dataset = Dataset::new("multi_data_ds".to_string(), owner, None, Uuid::new_v4());
4498        let dataset_id = dataset.id;
4499        ops::datasets::create_dataset(&db, dataset).await.unwrap();
4500
4501        let loc1 = storage.store(b"content one", "one.txt").await.unwrap();
4502        let data_id_1 = Uuid::new_v4();
4503        let data1 = Data::builder(
4504            data_id_1,
4505            "one.txt",
4506            loc1,
4507            "file://one.txt",
4508            "txt",
4509            "text/plain",
4510            "hash_one",
4511            owner,
4512        )
4513        .build();
4514        ops::data::create_data(&db, data1).await.unwrap();
4515        ops::datasets::attach_data_to_dataset(&db, dataset_id, data_id_1)
4516            .await
4517            .unwrap();
4518
4519        let loc2 = storage.store(b"content two", "two.txt").await.unwrap();
4520        let data_id_2 = Uuid::new_v4();
4521        let data2 = Data::builder(
4522            data_id_2,
4523            "two.txt",
4524            loc2,
4525            "file://two.txt",
4526            "txt",
4527            "text/plain",
4528            "hash_two",
4529            owner,
4530        )
4531        .build();
4532        ops::data::create_data(&db, data2).await.unwrap();
4533        ops::datasets::attach_data_to_dataset(&db, dataset_id, data_id_2)
4534            .await
4535            .unwrap();
4536
4537        // Delete one data item with the flag set
4538        let result = svc
4539            .execute(&DeleteRequest {
4540                scope: DeleteScope::Data {
4541                    owner_id: owner,
4542                    data_id: data_id_1,
4543                    dataset_name: Some("multi_data_ds".to_string()),
4544                    delete_dataset_if_empty: true,
4545                },
4546                mode: DeleteMode::Soft,
4547                memory_only: false,
4548            })
4549            .await
4550            .expect("execute should succeed");
4551
4552        assert_eq!(result.deleted_data, 1, "data should be deleted");
4553        assert_eq!(
4554            result.deleted_datasets, 0,
4555            "dataset should survive because it still has data_id_2"
4556        );
4557
4558        // Verify the dataset still exists
4559        let ds = ops::datasets::get_dataset_by_name(&db, "multi_data_ds", owner, None)
4560            .await
4561            .unwrap();
4562        assert!(ds.is_some(), "dataset should still exist in DB");
4563    }
4564
4565    #[tokio::test]
4566    async fn delete_data_without_flag_keeps_empty_dataset() {
4567        let (svc, storage, db) = make_service().await;
4568        let owner = Uuid::new_v4();
4569        let (_dataset_id, data_id) =
4570            seed_dataset_with_data(&db, &storage, owner, "no_flag_ds").await;
4571
4572        let result = svc
4573            .execute(&DeleteRequest {
4574                scope: DeleteScope::Data {
4575                    owner_id: owner,
4576                    data_id,
4577                    dataset_name: Some("no_flag_ds".to_string()),
4578                    delete_dataset_if_empty: false,
4579                },
4580                mode: DeleteMode::Soft,
4581                memory_only: false,
4582            })
4583            .await
4584            .expect("execute should succeed");
4585
4586        assert_eq!(result.deleted_data, 1, "data should be deleted");
4587        assert_eq!(
4588            result.deleted_datasets, 0,
4589            "dataset should survive because flag is false"
4590        );
4591
4592        // Verify the dataset still exists (even though it's now empty)
4593        let ds = ops::datasets::get_dataset_by_name(&db, "no_flag_ds", owner, None)
4594            .await
4595            .unwrap();
4596        assert!(
4597            ds.is_some(),
4598            "dataset should still exist despite being empty"
4599        );
4600    }
4601
4602    // ------------------------------------------------------------------
4603    // tenant_id filtering in DeleteDb::get_dataset_by_name
4604    // ------------------------------------------------------------------
4605
4606    #[tokio::test]
4607    async fn delete_db_get_dataset_by_name_filters_by_tenant_id() {
4608        use cognee_database::DeleteDb;
4609
4610        let db = cognee_database::connect("sqlite::memory:").await.unwrap();
4611        cognee_database::initialize(&db).await.unwrap();
4612
4613        let owner = Uuid::new_v4();
4614        let tenant_a = Uuid::new_v4();
4615        let tenant_b = Uuid::new_v4();
4616
4617        // Create two datasets with the same name and owner but different tenants.
4618        let ds_a = Dataset::new(
4619            "shared_name".to_string(),
4620            owner,
4621            Some(tenant_a),
4622            Uuid::new_v4(),
4623        );
4624        let ds_b = Dataset::new(
4625            "shared_name".to_string(),
4626            owner,
4627            Some(tenant_b),
4628            Uuid::new_v4(),
4629        );
4630        ops::datasets::create_dataset(&db, ds_a.clone())
4631            .await
4632            .unwrap();
4633        ops::datasets::create_dataset(&db, ds_b.clone())
4634            .await
4635            .unwrap();
4636
4637        // Without tenant_id (None), the query returns one of them (ambiguous).
4638        let any = db
4639            .get_dataset_by_name("shared_name", owner, None)
4640            .await
4641            .unwrap();
4642        assert!(any.is_some(), "should find at least one dataset");
4643
4644        // With tenant_a, only the first dataset is returned.
4645        let found_a = db
4646            .get_dataset_by_name("shared_name", owner, Some(tenant_a))
4647            .await
4648            .unwrap();
4649        assert_eq!(
4650            found_a.as_ref().map(|d| d.id),
4651            Some(ds_a.id),
4652            "should find tenant_a's dataset"
4653        );
4654
4655        // With tenant_b, only the second dataset is returned.
4656        let found_b = db
4657            .get_dataset_by_name("shared_name", owner, Some(tenant_b))
4658            .await
4659            .unwrap();
4660        assert_eq!(
4661            found_b.as_ref().map(|d| d.id),
4662            Some(ds_b.id),
4663            "should find tenant_b's dataset"
4664        );
4665
4666        // With a nonexistent tenant_id, nothing is returned.
4667        let found_none = db
4668            .get_dataset_by_name("shared_name", owner, Some(Uuid::new_v4()))
4669            .await
4670            .unwrap();
4671        assert!(
4672            found_none.is_none(),
4673            "should find no dataset for unknown tenant"
4674        );
4675    }
4676
4677    // ------------------------------------------------------------------
4678    // Memory-only tests (task 18)
4679    // ------------------------------------------------------------------
4680
4681    /// Verify that `memory_only: true` removes graph/vector artifacts but
4682    /// leaves the `Dataset` row, `Data` row, and stored file intact.
4683    #[tokio::test]
4684    async fn memory_only_dataset_preserves_rows_and_files() {
4685        let (svc, storage, db, graph_db, vector_db) = make_service_with_graph_vector().await;
4686        let owner = Uuid::new_v4();
4687        let (dataset_id, data_id) =
4688            seed_dataset_with_data(&db, &storage, owner, "memory_only_ds").await;
4689
4690        // Seed provenance nodes + graph/vector artifacts.
4691        let slug = Uuid::new_v4();
4692        seed_provenance_nodes(
4693            &db,
4694            dataset_id,
4695            data_id,
4696            owner,
4697            &[slug],
4698            "Entity",
4699            serde_json::json!(["name"]),
4700        )
4701        .await;
4702        graph_db
4703            .add_node_raw(serde_json::json!({"id": slug.to_string(), "name": "TestNode"}))
4704            .await
4705            .unwrap();
4706        vector_db
4707            .create_collection("Entity", "name", 3)
4708            .await
4709            .unwrap();
4710        vector_db
4711            .index_points(
4712                "Entity",
4713                "name",
4714                &[cognee_vector::VectorPoint::new(slug, vec![1.0, 0.0, 0.0])],
4715            )
4716            .await
4717            .unwrap();
4718
4719        // Seed pipeline_status with BOTH an add_pipeline and a cognify_pipeline
4720        // entry keyed by this dataset. Python's `_forget_dataset_memory` removes
4721        // the dataset_id entry from EVERY pipeline on each Data record.
4722        let dataset_id_hex = cognee_database::uuid_hex::to_hex(dataset_id);
4723        let status_json = serde_json::json!({
4724            "add_pipeline": { dataset_id_hex.clone(): "DATASET_PROCESSING_COMPLETED" },
4725            "cognify_pipeline": { dataset_id_hex.clone(): "DATASET_PROCESSING_COMPLETED" },
4726        });
4727        let data_rec = ops::data::get_data(&db, data_id).await.unwrap().unwrap();
4728        ops::data::update_data(
4729            &db,
4730            Data {
4731                pipeline_status: Some(status_json.to_string()),
4732                ..data_rec
4733            },
4734        )
4735        .await
4736        .unwrap();
4737
4738        let result = svc
4739            .execute(&DeleteRequest {
4740                scope: DeleteScope::Dataset {
4741                    owner_id: owner,
4742                    dataset_name: "memory_only_ds".to_string(),
4743                },
4744                mode: DeleteMode::Soft,
4745                memory_only: true,
4746            })
4747            .await
4748            .expect("memory-only execute should succeed");
4749
4750        // Graph/vector cleared.
4751        assert_eq!(
4752            result.deleted_graph_nodes, 1,
4753            "graph node should be cleared"
4754        );
4755        assert_eq!(
4756            result.deleted_vector_points, 1,
4757            "vector point should be cleared"
4758        );
4759
4760        // No relational rows deleted.
4761        assert_eq!(result.deleted_datasets, 0, "dataset must not be deleted");
4762        assert_eq!(result.deleted_data, 0, "data must not be deleted");
4763        assert_eq!(
4764            result.deleted_storage_files, 0,
4765            "storage file must not be deleted"
4766        );
4767
4768        // Dataset row still exists in the DB.
4769        let ds_still = ops::datasets::get_dataset_by_name(&db, "memory_only_ds", owner, None)
4770            .await
4771            .unwrap();
4772        assert!(
4773            ds_still.is_some(),
4774            "Dataset row must survive memory-only forget"
4775        );
4776
4777        // Data row still exists.
4778        let data_still = ops::data::get_data(&db, data_id).await.unwrap();
4779        assert!(
4780            data_still.is_some(),
4781            "Data row must survive memory-only forget"
4782        );
4783
4784        // Python parity (`_forget_dataset_memory`): the dataset_id entry is
4785        // removed from EVERY pipeline on the Data record (add + cognify). Both
4786        // inner maps become empty, so the whole pipeline_status is cleared.
4787        let ps = data_still.unwrap().pipeline_status;
4788        let cleared = match ps {
4789            None => true,
4790            Some(s) => {
4791                let v: serde_json::Value = serde_json::from_str(&s).unwrap();
4792                let add_has = v
4793                    .get("add_pipeline")
4794                    .and_then(|p| p.as_object())
4795                    .map(|m| m.contains_key(&dataset_id_hex))
4796                    .unwrap_or(false);
4797                let cog_has = v
4798                    .get("cognify_pipeline")
4799                    .and_then(|p| p.as_object())
4800                    .map(|m| m.contains_key(&dataset_id_hex))
4801                    .unwrap_or(false);
4802                !add_has && !cog_has
4803            }
4804        };
4805        assert!(
4806            cleared,
4807            "dataset_id entry must be removed from ALL pipelines (add + cognify) on the data record"
4808        );
4809    }
4810
4811    /// Verify that `memory_only: true` on a data-item scope also preserves
4812    /// the `Data` row and `Dataset` row.
4813    #[tokio::test]
4814    async fn memory_only_data_item_preserves_rows() {
4815        let (svc, storage, db, _graph_db, _vector_db) = make_service_with_graph_vector().await;
4816        let owner = Uuid::new_v4();
4817        let (dataset_id, data_id) =
4818            seed_dataset_with_data(&db, &storage, owner, "memory_only_item_ds").await;
4819
4820        // Add a SECOND data item to the same dataset (a sibling) to prove the
4821        // data-item path only touches the targeted record, not the whole
4822        // dataset (Python `_forget_data_memory` filters on `Data.id == data_id`).
4823        let sibling_loc = storage.store(b"sibling", "sibling.txt").await.unwrap();
4824        let sibling_id = Uuid::new_v4();
4825        let sibling = Data::builder(
4826            sibling_id,
4827            "sibling.txt",
4828            sibling_loc,
4829            "file://sibling.txt",
4830            "txt",
4831            "text/plain",
4832            "sibling_hash",
4833            owner,
4834        )
4835        .build();
4836        ops::data::create_data(&db, sibling).await.unwrap();
4837        ops::datasets::attach_data_to_dataset(&db, dataset_id, sibling_id)
4838            .await
4839            .unwrap();
4840
4841        let dataset_id_hex = cognee_database::uuid_hex::to_hex(dataset_id);
4842        // Target record: keep add_pipeline, remove only cognify_pipeline.
4843        let target_status = serde_json::json!({
4844            "add_pipeline": { dataset_id_hex.clone(): "DATASET_PROCESSING_COMPLETED" },
4845            "cognify_pipeline": { dataset_id_hex.clone(): "DATASET_PROCESSING_COMPLETED" },
4846        });
4847        let target_rec = ops::data::get_data(&db, data_id).await.unwrap().unwrap();
4848        ops::data::update_data(
4849            &db,
4850            Data {
4851                pipeline_status: Some(target_status.to_string()),
4852                ..target_rec
4853            },
4854        )
4855        .await
4856        .unwrap();
4857        // Sibling record: cognify status must be left untouched.
4858        let sibling_status = serde_json::json!({
4859            "cognify_pipeline": { dataset_id_hex.clone(): "DATASET_PROCESSING_COMPLETED" },
4860        });
4861        let sibling_rec = ops::data::get_data(&db, sibling_id).await.unwrap().unwrap();
4862        ops::data::update_data(
4863            &db,
4864            Data {
4865                pipeline_status: Some(sibling_status.to_string()),
4866                ..sibling_rec
4867            },
4868        )
4869        .await
4870        .unwrap();
4871
4872        let result = svc
4873            .execute(&DeleteRequest {
4874                scope: DeleteScope::Data {
4875                    owner_id: owner,
4876                    data_id,
4877                    dataset_name: Some("memory_only_item_ds".to_string()),
4878                    delete_dataset_if_empty: false,
4879                },
4880                mode: DeleteMode::Soft,
4881                memory_only: true,
4882            })
4883            .await
4884            .expect("memory-only data-item execute should succeed");
4885
4886        // No relational rows deleted.
4887        assert_eq!(result.deleted_data, 0, "data must not be deleted");
4888        assert_eq!(result.deleted_datasets, 0, "dataset must not be deleted");
4889        assert_eq!(
4890            result.deleted_storage_files, 0,
4891            "storage file must not be deleted"
4892        );
4893
4894        // Dataset row still exists.
4895        let ds_still = ops::datasets::get_dataset_by_name(&db, "memory_only_item_ds", owner, None)
4896            .await
4897            .unwrap();
4898        assert!(
4899            ds_still.is_some(),
4900            "Dataset row must survive data-item memory-only forget"
4901        );
4902
4903        // Data row still exists.
4904        let data_still = ops::data::get_data(&db, data_id).await.unwrap();
4905        assert!(
4906            data_still.is_some(),
4907            "Data row must survive data-item memory-only forget"
4908        );
4909
4910        // Target record: cognify removed, add_pipeline preserved.
4911        let target_after: serde_json::Value =
4912            serde_json::from_str(data_still.unwrap().pipeline_status.as_deref().unwrap()).unwrap();
4913        assert!(
4914            target_after.get("cognify_pipeline").is_none(),
4915            "cognify_pipeline must be removed from the targeted data record"
4916        );
4917        assert!(
4918            target_after
4919                .get("add_pipeline")
4920                .and_then(|p| p.as_object())
4921                .map(|m| m.contains_key(&dataset_id_hex))
4922                .unwrap_or(false),
4923            "add_pipeline status must be preserved on the targeted data record"
4924        );
4925
4926        // Sibling record: cognify status untouched (data-item path is scoped).
4927        let sibling_after: serde_json::Value = serde_json::from_str(
4928            ops::data::get_data(&db, sibling_id)
4929                .await
4930                .unwrap()
4931                .unwrap()
4932                .pipeline_status
4933                .as_deref()
4934                .unwrap(),
4935        )
4936        .unwrap();
4937        assert!(
4938            sibling_after
4939                .get("cognify_pipeline")
4940                .and_then(|p| p.as_object())
4941                .map(|m| m.contains_key(&dataset_id_hex))
4942                .unwrap_or(false),
4943            "sibling data record's cognify status must NOT be cleared by a data-item forget"
4944        );
4945    }
4946
4947    /// Regression test for Item 3 (B6.6): when the Data row is absent (custom
4948    /// graph model or orphaned graph data), `DeleteScope::Data` must succeed and
4949    /// return a best-effort cleanup result rather than a validation error.
4950    ///
4951    /// Python parity: `datasets.py:165-176`.
4952    #[tokio::test]
4953    async fn delete_data_without_relational_row_returns_success() {
4954        let (svc, _storage, _db) = make_service().await;
4955        let owner = Uuid::new_v4();
4956        let ghost_data_id = Uuid::new_v4();
4957
4958        // No dataset row, no data row — only the IDs exist.
4959        let result = svc
4960            .execute(&DeleteRequest {
4961                scope: DeleteScope::Data {
4962                    owner_id: owner,
4963                    data_id: ghost_data_id,
4964                    dataset_name: None,
4965                    delete_dataset_if_empty: false,
4966                },
4967                mode: DeleteMode::Soft,
4968                memory_only: false,
4969            })
4970            .await
4971            .expect("should succeed for a data_id with no relational row (custom graph model)");
4972
4973        // The core cleanup paths ran without error.  The call must not return a
4974        // Validation error — that is the key invariant.  `deleted_data` may be
4975        // 1 even when no relational row existed because the delete pipeline
4976        // attempts a best-effort relational DELETE and increments the counter
4977        // regardless of the underlying row count (SQLite DELETE WHERE succeeds
4978        // on a non-existent row with 0 rows affected, but still counted as
4979        // one delete attempt).
4980        assert!(
4981            result.deleted_data <= 1,
4982            "unexpected large deletion count for a ghost data_id: deleted_data={}",
4983            result.deleted_data
4984        );
4985    }
4986}