Skip to main content

cognee_lib/api/
datasets.rs

1//! High-level dataset management facade.
2//!
3//! [`DatasetManager`] composes the existing `IngestDb`, `DeleteDb`, and `AclDb`
4//! traits into a unified API matching the Python SDK's `datasets` class.
5
6use std::collections::HashMap;
7use std::sync::Arc;
8
9use cognee_database::{AclDb, DeleteDb, IngestDb, PipelineRunStatus};
10use cognee_delete::{DeleteMode, DeleteRequest, DeleteResult, DeleteScope, DeleteService};
11use cognee_ingestion::generate_dataset_id;
12use cognee_models::{Data, Dataset};
13use uuid::Uuid;
14
15use super::error::DatasetError;
16
17const DATASET_PERMISSIONS: [&str; 4] = ["read", "write", "delete", "share"];
18
19/// Combined trait for dataset operations.
20///
21/// Any `DatabaseConnection` implements both `IngestDb` and `DeleteDb`,
22/// so it automatically satisfies this super-trait.
23pub trait DatasetDb: IngestDb + DeleteDb + Send + Sync {}
24impl<T: IngestDb + DeleteDb + Send + Sync> DatasetDb for T {}
25
26/// High-level facade for dataset CRUD operations.
27///
28/// Wraps the low-level DB traits with optional ACL enforcement, matching
29/// the Python SDK's `datasets` class.
30pub struct DatasetManager {
31    db: Arc<dyn DatasetDb>,
32    acl_db: Option<Arc<dyn AclDb>>,
33}
34
35impl DatasetManager {
36    /// Create a new `DatasetManager` without ACL enforcement.
37    pub fn new(db: Arc<dyn DatasetDb>) -> Self {
38        Self { db, acl_db: None }
39    }
40
41    /// Enable ACL enforcement using the given ACL database.
42    pub fn with_acl(mut self, acl_db: Arc<dyn AclDb>) -> Self {
43        self.acl_db = Some(acl_db);
44        self
45    }
46
47    // ------------------------------------------------------------------
48    // Read operations
49    // ------------------------------------------------------------------
50
51    /// List all datasets accessible to the given owner.
52    ///
53    /// When ACL is configured, only datasets the owner has "read" permission
54    /// on are returned. Without ACL, all datasets owned by the user are listed.
55    pub async fn list_datasets(&self, owner_id: Uuid) -> Result<Vec<Dataset>, DatasetError> {
56        if let Some(acl) = &self.acl_db {
57            let authorized_ids = acl.authorized_dataset_ids(owner_id, "read").await?;
58            let mut datasets = Vec::with_capacity(authorized_ids.len());
59            for id in authorized_ids {
60                if let Some(ds) = self.db.get_dataset(id).await? {
61                    datasets.push(ds);
62                }
63            }
64            Ok(datasets)
65        } else {
66            Ok(IngestDb::list_datasets_by_owner(self.db.as_ref(), owner_id).await?)
67        }
68    }
69
70    /// List all data items in a dataset, with permission check.
71    ///
72    /// Results are sorted by `data_size` descending (largest first), matching
73    /// Python SDK behaviour.
74    pub async fn list_data(
75        &self,
76        dataset_id: Uuid,
77        owner_id: Uuid,
78    ) -> Result<Vec<Data>, DatasetError> {
79        self.check_read_permission(owner_id, dataset_id).await?;
80        let mut items = self.db.get_dataset_data(dataset_id).await?;
81        items.sort_by_key(|b| std::cmp::Reverse(b.data_size));
82        Ok(items)
83    }
84
85    /// Check whether a dataset contains any data items.
86    ///
87    /// Enforces read permission when ACL is configured, then uses an efficient
88    /// COUNT query instead of loading all records.
89    pub async fn has_data(&self, dataset_id: Uuid, owner_id: Uuid) -> Result<bool, DatasetError> {
90        self.check_read_permission(owner_id, dataset_id).await?;
91        let count = self.db.count_dataset_data(dataset_id).await?;
92        Ok(count > 0)
93    }
94
95    /// Get the latest pipeline status for each dataset, across all tracked pipelines.
96    ///
97    /// Returns a nested map `{ dataset_id → { pipeline_name → status } }`.
98    /// Datasets with no pipeline runs for a given pipeline are omitted from the
99    /// inner map (equivalent to Python's "not started" behaviour).
100    pub async fn get_status(
101        &self,
102        dataset_ids: &[Uuid],
103    ) -> Result<HashMap<Uuid, HashMap<String, PipelineRunStatus>>, DatasetError> {
104        const PIPELINES: &[&str] = &["add_pipeline", "cognify_pipeline"];
105        let mut statuses: HashMap<Uuid, HashMap<String, PipelineRunStatus>> =
106            HashMap::with_capacity(dataset_ids.len());
107        for &id in dataset_ids {
108            for pipeline_name in PIPELINES {
109                if let Some(status) = self
110                    .db
111                    .get_latest_pipeline_status(pipeline_name, id)
112                    .await?
113                {
114                    statuses
115                        .entry(id)
116                        .or_default()
117                        .insert(pipeline_name.to_string(), status);
118                }
119            }
120        }
121        Ok(statuses)
122    }
123
124    /// Scan a filesystem directory for dataset-like sub-directories.
125    ///
126    /// Returns the names of immediate child directories. This is a sync
127    /// utility matching the Python SDK's `discover_datasets` method.
128    pub fn discover_datasets(
129        directory_path: &std::path::Path,
130    ) -> Result<Vec<String>, DatasetError> {
131        let mut datasets = Vec::new();
132        for entry in std::fs::read_dir(directory_path)? {
133            let entry = entry?;
134            if entry.file_type()?.is_dir()
135                && let Some(name) = entry.file_name().to_str()
136            {
137                datasets.push(name.to_owned());
138            }
139        }
140        Ok(datasets)
141    }
142
143    // ------------------------------------------------------------------
144    // Write / delete operations
145    // ------------------------------------------------------------------
146
147    /// Delete all data in a dataset (and the dataset record itself).
148    ///
149    /// Delegates to `DeleteService` with `DeleteScope::Dataset`.
150    pub async fn empty_dataset(
151        &self,
152        dataset_id: Uuid,
153        owner_id: Uuid,
154        delete_service: &DeleteService,
155    ) -> Result<DeleteResult, DatasetError> {
156        let dataset = self.require_dataset(dataset_id).await?;
157        self.check_delete_permission(owner_id, dataset_id).await?;
158        let request = DeleteRequest {
159            scope: DeleteScope::Dataset {
160                owner_id,
161                dataset_name: dataset.name,
162            },
163            mode: DeleteMode::Hard,
164            memory_only: false,
165        };
166        Ok(delete_service.execute(&request).await?)
167    }
168
169    /// Delete a specific data item from a dataset.
170    ///
171    /// Delegates to `DeleteService` with `DeleteScope::Data`.
172    pub async fn delete_data(
173        &self,
174        dataset_id: Uuid,
175        data_id: Uuid,
176        owner_id: Uuid,
177        mode: DeleteMode,
178        delete_dataset_if_empty: bool,
179        delete_service: &DeleteService,
180    ) -> Result<DeleteResult, DatasetError> {
181        let dataset = self.require_dataset(dataset_id).await?;
182        self.check_delete_permission(owner_id, dataset_id).await?;
183        let request = DeleteRequest {
184            scope: DeleteScope::Data {
185                owner_id,
186                data_id,
187                dataset_name: Some(dataset.name),
188                delete_dataset_if_empty,
189            },
190            mode,
191            memory_only: false,
192        };
193        Ok(delete_service.execute(&request).await?)
194    }
195
196    /// Delete all datasets for an owner.
197    ///
198    /// Lists all accessible datasets and delegates each to `DeleteService`.
199    pub async fn delete_all(
200        &self,
201        owner_id: Uuid,
202        delete_service: &DeleteService,
203    ) -> Result<Vec<DeleteResult>, DatasetError> {
204        let datasets = self.list_datasets(owner_id).await?;
205        let mut results = Vec::with_capacity(datasets.len());
206        for ds in datasets {
207            let request = DeleteRequest {
208                scope: DeleteScope::Dataset {
209                    owner_id,
210                    dataset_name: ds.name,
211                },
212                mode: DeleteMode::Hard,
213                memory_only: false,
214            };
215            results.push(delete_service.execute(&request).await?);
216        }
217        Ok(results)
218    }
219
220    // ------------------------------------------------------------------
221    // Create operations
222    // ------------------------------------------------------------------
223
224    /// Create a dataset with a deterministic ID matching Python's formula:
225    ///   `uuid5(NAMESPACE_OID, f"{name}{user_id}{tenant_id}")`.
226    ///
227    /// Idempotent: if a dataset with the same deterministic ID already exists,
228    /// returns the existing row.
229    pub async fn create_dataset(
230        &self,
231        name: &str,
232        owner_id: Uuid,
233        tenant_id: Option<Uuid>,
234    ) -> Result<Dataset, DatasetError> {
235        let id = generate_dataset_id(name, owner_id, tenant_id);
236        // Try to get existing dataset first (idempotent create).
237        if let Some(existing) = self.db.get_dataset(id).await? {
238            return Ok(existing);
239        }
240        let dataset = Dataset::new(name.to_string(), owner_id, tenant_id, id);
241        Ok(self.db.create_dataset(dataset).await?)
242    }
243
244    /// Create a dataset and grant all four ACL permissions (`read`, `write`,
245    /// `delete`, `share`) to the owner.
246    pub async fn create_authorized_dataset(
247        &self,
248        name: &str,
249        owner_id: Uuid,
250        tenant_id: Option<Uuid>,
251        parent_user_id: Option<Uuid>,
252    ) -> Result<Dataset, DatasetError> {
253        let ds = self.create_dataset(name, owner_id, tenant_id).await?;
254        let acl = self.acl_db.as_ref().ok_or(DatasetError::AclNotConfigured)?;
255
256        // The `acls.principal_id` FK references `principals.id`; ensure the
257        // principal row exists before granting, otherwise the grant fails a
258        // foreign-key constraint. Python's `give_permission_on_dataset` takes
259        // an already-persisted `User`; the Rust facade may be called with a
260        // bare id, so we upsert the principal here. `ensure_principal` is an
261        // idempotent upsert.
262        acl.ensure_principal(owner_id, "user").await?;
263        for perm in DATASET_PERMISSIONS {
264            acl.grant_permission(owner_id, ds.id, perm).await?;
265        }
266        if let Some(parent) = parent_user_id
267            && parent != owner_id
268        {
269            acl.ensure_principal(parent, "user").await?;
270            for perm in DATASET_PERMISSIONS {
271                acl.grant_permission(parent, ds.id, perm).await?;
272            }
273        }
274        Ok(ds)
275    }
276
277    // ------------------------------------------------------------------
278    // Helpers
279    // ------------------------------------------------------------------
280
281    async fn check_read_permission(
282        &self,
283        owner_id: Uuid,
284        dataset_id: Uuid,
285    ) -> Result<(), DatasetError> {
286        if let Some(acl) = &self.acl_db
287            && !acl.has_permission(owner_id, dataset_id, "read").await?
288        {
289            return Err(DatasetError::PermissionDenied);
290        }
291        Ok(())
292    }
293
294    async fn check_delete_permission(
295        &self,
296        owner_id: Uuid,
297        dataset_id: Uuid,
298    ) -> Result<(), DatasetError> {
299        if let Some(acl) = &self.acl_db
300            && !acl.has_permission(owner_id, dataset_id, "delete").await?
301        {
302            return Err(DatasetError::PermissionDenied);
303        }
304        Ok(())
305    }
306
307    async fn require_dataset(&self, id: Uuid) -> Result<Dataset, DatasetError> {
308        self.db.get_dataset(id).await?.ok_or(DatasetError::NotFound)
309    }
310}
311
312#[cfg(test)]
313#[allow(
314    clippy::unwrap_used,
315    clippy::expect_used,
316    reason = "test code — panics are acceptable failures"
317)]
318mod tests {
319    use super::*;
320    use cognee_database::{connect, initialize};
321    use cognee_models::{Data, Dataset};
322    use uuid::Uuid;
323
324    /// Create a fresh in-memory SQLite database with migrations applied.
325    async fn fresh_db() -> Arc<cognee_database::DatabaseConnection> {
326        let db = connect("sqlite::memory:")
327            .await
328            .expect("in-memory SQLite always connects");
329        initialize(&db)
330            .await
331            .expect("migrations succeed on empty DB");
332        Arc::new(db)
333    }
334
335    fn make_dataset(owner_id: Uuid) -> Dataset {
336        Dataset::new(
337            format!("test-dataset-{}", Uuid::new_v4()),
338            owner_id,
339            None,
340            Uuid::new_v4(),
341        )
342    }
343
344    fn make_data(owner_id: Uuid) -> Data {
345        let id = Uuid::new_v4();
346        let loc = format!("file:///tmp/test/{id}.txt");
347        Data::builder(
348            id,
349            "test-data.txt",
350            loc.as_str(),
351            loc.as_str(),
352            "txt",
353            "text/plain",
354            format!("{:x}", Uuid::new_v4()),
355            owner_id,
356        )
357        .build()
358    }
359
360    fn make_data_with_size(owner_id: Uuid, size: i64) -> Data {
361        let id = Uuid::new_v4();
362        let loc = format!("file:///tmp/test/{id}.txt");
363        Data::builder(
364            id,
365            "file.txt",
366            loc.as_str(),
367            loc.as_str(),
368            "txt",
369            "text/plain",
370            format!("{:x}", Uuid::new_v4()),
371            owner_id,
372        )
373        .data_size(size)
374        .build()
375    }
376
377    #[tokio::test]
378    async fn test_list_datasets_no_acl() {
379        let db = fresh_db().await;
380        let owner_id = Uuid::new_v4();
381        let ds = make_dataset(owner_id);
382
383        // Insert dataset directly via IngestDb
384        let ingest: &dyn IngestDb = db.as_ref();
385        ingest
386            .create_dataset(ds.clone())
387            .await
388            .expect("create_dataset");
389
390        let mgr = DatasetManager::new(db.clone() as Arc<dyn DatasetDb>);
391        let result = mgr.list_datasets(owner_id).await.expect("list_datasets");
392        assert_eq!(result.len(), 1);
393        assert_eq!(result[0].id, ds.id);
394    }
395
396    #[tokio::test]
397    async fn test_list_datasets_different_owner() {
398        let db = fresh_db().await;
399        let owner_a = Uuid::new_v4();
400        let owner_b = Uuid::new_v4();
401
402        let ingest: &dyn IngestDb = db.as_ref();
403        ingest
404            .create_dataset(make_dataset(owner_a))
405            .await
406            .expect("create_dataset");
407        ingest
408            .create_dataset(make_dataset(owner_b))
409            .await
410            .expect("create_dataset");
411
412        let mgr = DatasetManager::new(db.clone() as Arc<dyn DatasetDb>);
413        let result_a = mgr.list_datasets(owner_a).await.expect("list_datasets");
414        assert_eq!(result_a.len(), 1);
415        let result_b = mgr.list_datasets(owner_b).await.expect("list_datasets");
416        assert_eq!(result_b.len(), 1);
417    }
418
419    #[tokio::test]
420    async fn test_has_data_empty_dataset() {
421        let db = fresh_db().await;
422        let owner_id = Uuid::new_v4();
423        let ds = make_dataset(owner_id);
424
425        let ingest: &dyn IngestDb = db.as_ref();
426        ingest
427            .create_dataset(ds.clone())
428            .await
429            .expect("create_dataset");
430
431        let mgr = DatasetManager::new(db.clone() as Arc<dyn DatasetDb>);
432        assert!(!mgr.has_data(ds.id, owner_id).await.expect("has_data"));
433    }
434
435    #[tokio::test]
436    async fn test_has_data_with_data() {
437        let db = fresh_db().await;
438        let owner_id = Uuid::new_v4();
439        let ds = make_dataset(owner_id);
440        let data = make_data(owner_id);
441
442        let ingest: &dyn IngestDb = db.as_ref();
443        ingest
444            .create_dataset(ds.clone())
445            .await
446            .expect("create_dataset");
447        ingest.create_data(data.clone()).await.expect("create_data");
448        ingest
449            .attach_data_to_dataset(ds.id, data.id)
450            .await
451            .expect("attach_data");
452
453        let mgr = DatasetManager::new(db.clone() as Arc<dyn DatasetDb>);
454        assert!(mgr.has_data(ds.id, owner_id).await.expect("has_data"));
455    }
456
457    #[tokio::test]
458    async fn test_has_data_permission_denied_with_acl() {
459        let db = fresh_db().await;
460        let owner_id = Uuid::new_v4();
461        let other_id = Uuid::new_v4();
462        let ds = make_dataset(owner_id);
463
464        let ingest: &dyn IngestDb = db.as_ref();
465        ingest
466            .create_dataset(ds.clone())
467            .await
468            .expect("create_dataset");
469
470        // Grant read permission to owner only (via ACL). The OSS test path
471        // uses `MockAclDb` because the closed `cognee-access-control` crate
472        // (which provides `AclDb for DatabaseConnection`) is not present.
473        let acl: Arc<dyn AclDb> = Arc::new(cognee_test_utils::MockAclDb::new());
474        acl.ensure_principal(owner_id, "user")
475            .await
476            .expect("ensure_principal");
477        acl.grant_permission(owner_id, ds.id, "read")
478            .await
479            .expect("grant_permission");
480
481        let mgr = DatasetManager::new(db.clone() as Arc<dyn DatasetDb>).with_acl(acl);
482
483        // Owner can check — should succeed.
484        assert!(
485            mgr.has_data(ds.id, owner_id).await.is_ok(),
486            "owner must be able to call has_data"
487        );
488
489        // Other user gets PermissionDenied.
490        let err = mgr
491            .has_data(ds.id, other_id)
492            .await
493            .expect_err("must fail for unauthorized user");
494        assert!(
495            matches!(err, DatasetError::PermissionDenied),
496            "expected PermissionDenied, got {err:?}"
497        );
498    }
499
500    #[tokio::test]
501    async fn test_list_data() {
502        let db = fresh_db().await;
503        let owner_id = Uuid::new_v4();
504        let ds = make_dataset(owner_id);
505        let data = make_data(owner_id);
506
507        let ingest: &dyn IngestDb = db.as_ref();
508        ingest
509            .create_dataset(ds.clone())
510            .await
511            .expect("create_dataset");
512        ingest.create_data(data.clone()).await.expect("create_data");
513        ingest
514            .attach_data_to_dataset(ds.id, data.id)
515            .await
516            .expect("attach_data");
517
518        let mgr = DatasetManager::new(db.clone() as Arc<dyn DatasetDb>);
519        let items = mgr.list_data(ds.id, owner_id).await.expect("list_data");
520        assert_eq!(items.len(), 1);
521        assert_eq!(items[0].id, data.id);
522    }
523
524    #[tokio::test]
525    async fn test_list_data_sorted_by_size_descending() {
526        let db = fresh_db().await;
527        let owner_id = Uuid::new_v4();
528        let ds = make_dataset(owner_id);
529
530        let ingest: &dyn IngestDb = db.as_ref();
531        ingest
532            .create_dataset(ds.clone())
533            .await
534            .expect("create_dataset");
535
536        // Create three data items with distinct sizes.
537        let small = make_data_with_size(owner_id, 10);
538        let large = make_data_with_size(owner_id, 1000);
539        let medium = make_data_with_size(owner_id, 500);
540
541        for d in [&small, &large, &medium] {
542            ingest.create_data(d.clone()).await.expect("create_data");
543            ingest
544                .attach_data_to_dataset(ds.id, d.id)
545                .await
546                .expect("attach_data");
547        }
548
549        let mgr = DatasetManager::new(db.clone() as Arc<dyn DatasetDb>);
550        let items = mgr.list_data(ds.id, owner_id).await.expect("list_data");
551        assert_eq!(items.len(), 3);
552        // Must be sorted largest first.
553        assert_eq!(items[0].id, large.id, "largest must come first");
554        assert_eq!(items[1].id, medium.id, "medium second");
555        assert_eq!(items[2].id, small.id, "smallest last");
556    }
557
558    #[tokio::test]
559    async fn test_get_status_no_runs() {
560        let db = fresh_db().await;
561        let owner_id = Uuid::new_v4();
562        let ds = make_dataset(owner_id);
563
564        let ingest: &dyn IngestDb = db.as_ref();
565        ingest
566            .create_dataset(ds.clone())
567            .await
568            .expect("create_dataset");
569
570        let mgr = DatasetManager::new(db.clone() as Arc<dyn DatasetDb>);
571        let statuses = mgr.get_status(&[ds.id]).await.expect("get_status");
572        // No pipeline runs recorded → the outer map should be empty.
573        assert!(statuses.is_empty());
574    }
575
576    #[tokio::test]
577    async fn test_discover_datasets() {
578        let tmpdir = tempfile::tempdir().expect("create temp dir");
579        std::fs::create_dir(tmpdir.path().join("dataset-a")).expect("create dir");
580        std::fs::create_dir(tmpdir.path().join("dataset-b")).expect("create dir");
581        // Create a file to verify it's excluded
582        std::fs::write(tmpdir.path().join("not-a-dataset.txt"), "hello").expect("create file");
583
584        let mut result =
585            DatasetManager::discover_datasets(tmpdir.path()).expect("discover_datasets");
586        result.sort();
587        assert_eq!(result, vec!["dataset-a", "dataset-b"]);
588    }
589
590    #[tokio::test]
591    async fn test_require_dataset_not_found() {
592        let db = fresh_db().await;
593        let mgr = DatasetManager::new(db as Arc<dyn DatasetDb>);
594        let err = mgr.require_dataset(Uuid::new_v4()).await;
595        assert!(matches!(err, Err(DatasetError::NotFound)));
596    }
597
598    #[tokio::test]
599    async fn test_create_dataset_deterministic_id() {
600        let db = fresh_db().await;
601        let owner_id = Uuid::new_v4();
602        let tenant_id = Some(Uuid::new_v4());
603        let mgr = DatasetManager::new(db.clone() as Arc<dyn DatasetDb>);
604
605        let ds = mgr
606            .create_dataset("my-ds", owner_id, tenant_id)
607            .await
608            .expect("create_dataset");
609
610        let expected_id = generate_dataset_id("my-ds", owner_id, tenant_id);
611        assert_eq!(ds.id, expected_id, "ID must match generate_dataset_id");
612        assert_eq!(ds.name, "my-ds");
613        assert_eq!(ds.owner_id, owner_id);
614    }
615
616    #[tokio::test]
617    async fn test_create_dataset_idempotent() {
618        let db = fresh_db().await;
619        let owner_id = Uuid::new_v4();
620        let mgr = DatasetManager::new(db.clone() as Arc<dyn DatasetDb>);
621
622        let ds1 = mgr
623            .create_dataset("dup-ds", owner_id, None)
624            .await
625            .expect("first create");
626        let ds2 = mgr
627            .create_dataset("dup-ds", owner_id, None)
628            .await
629            .expect("second create");
630
631        assert_eq!(ds1.id, ds2.id, "Idempotent: same ID returned on duplicate");
632
633        // Ensure only one row exists
634        let list = IngestDb::list_datasets_by_owner(db.as_ref(), owner_id)
635            .await
636            .unwrap();
637        assert_eq!(list.len(), 1, "Only one row should exist");
638    }
639
640    #[tokio::test]
641    async fn test_create_authorized_dataset_without_acl_errors() {
642        let db = fresh_db().await;
643        let owner_id = Uuid::new_v4();
644        let mgr = DatasetManager::new(db as Arc<dyn DatasetDb>);
645
646        let result = mgr
647            .create_authorized_dataset("auth-ds", owner_id, None, None)
648            .await;
649        assert!(
650            matches!(result, Err(DatasetError::AclNotConfigured)),
651            "Should error when ACL not configured"
652        );
653    }
654
655    #[tokio::test]
656    async fn test_create_authorized_dataset_grants_four_permissions() {
657        let db = fresh_db().await;
658        let owner_id = Uuid::new_v4();
659        let parent_id = Uuid::new_v4();
660        let acl: Arc<dyn AclDb> = Arc::new(cognee_test_utils::MockAclDb::new());
661        let mgr = DatasetManager::new(db.clone() as Arc<dyn DatasetDb>).with_acl(acl.clone());
662
663        let ds = mgr
664            .create_authorized_dataset("auth-ds", owner_id, None, Some(parent_id))
665            .await
666            .expect("create_authorized_dataset");
667
668        // Owner and parent both receive all four permissions on the dataset.
669        for perm in DATASET_PERMISSIONS {
670            assert!(
671                acl.has_permission(owner_id, ds.id, perm).await.unwrap(),
672                "owner must have '{perm}'"
673            );
674            assert!(
675                acl.has_permission(parent_id, ds.id, perm).await.unwrap(),
676                "parent must have '{perm}'"
677            );
678        }
679    }
680
681    #[tokio::test]
682    async fn test_create_authorized_dataset_parent_equals_owner_no_duplicate() {
683        let db = fresh_db().await;
684        let owner_id = Uuid::new_v4();
685        let acl: Arc<dyn AclDb> = Arc::new(cognee_test_utils::MockAclDb::new());
686        let mgr = DatasetManager::new(db.clone() as Arc<dyn DatasetDb>).with_acl(acl.clone());
687
688        // parent_user_id == owner_id: must succeed (idempotent) and grant once.
689        let ds = mgr
690            .create_authorized_dataset("auth-ds-self", owner_id, None, Some(owner_id))
691            .await
692            .expect("create_authorized_dataset with self-parent should succeed");
693
694        for perm in DATASET_PERMISSIONS {
695            assert!(
696                acl.has_permission(owner_id, ds.id, perm).await.unwrap(),
697                "owner must have '{perm}'"
698            );
699        }
700    }
701}