1use std::collections::HashMap;
7use std::sync::Arc;
8
9use cognee_database::{AclDb, DeleteDb, IngestDb, PipelineRunStatus};
10use cognee_delete::{DeleteMode, DeleteRequest, DeleteResult, DeleteScope, DeleteService};
11use cognee_ingestion::generate_dataset_id;
12use cognee_models::{Data, Dataset};
13use uuid::Uuid;
14
15use super::error::DatasetError;
16
17const DATASET_PERMISSIONS: [&str; 4] = ["read", "write", "delete", "share"];
18
19pub trait DatasetDb: IngestDb + DeleteDb + Send + Sync {}
24impl<T: IngestDb + DeleteDb + Send + Sync> DatasetDb for T {}
25
26pub struct DatasetManager {
31 db: Arc<dyn DatasetDb>,
32 acl_db: Option<Arc<dyn AclDb>>,
33}
34
35impl DatasetManager {
36 pub fn new(db: Arc<dyn DatasetDb>) -> Self {
38 Self { db, acl_db: None }
39 }
40
41 pub fn with_acl(mut self, acl_db: Arc<dyn AclDb>) -> Self {
43 self.acl_db = Some(acl_db);
44 self
45 }
46
47 pub async fn list_datasets(&self, owner_id: Uuid) -> Result<Vec<Dataset>, DatasetError> {
56 if let Some(acl) = &self.acl_db {
57 let authorized_ids = acl.authorized_dataset_ids(owner_id, "read").await?;
58 let mut datasets = Vec::with_capacity(authorized_ids.len());
59 for id in authorized_ids {
60 if let Some(ds) = self.db.get_dataset(id).await? {
61 datasets.push(ds);
62 }
63 }
64 Ok(datasets)
65 } else {
66 Ok(IngestDb::list_datasets_by_owner(self.db.as_ref(), owner_id).await?)
67 }
68 }
69
70 pub async fn list_data(
75 &self,
76 dataset_id: Uuid,
77 owner_id: Uuid,
78 ) -> Result<Vec<Data>, DatasetError> {
79 self.check_read_permission(owner_id, dataset_id).await?;
80 let mut items = self.db.get_dataset_data(dataset_id).await?;
81 items.sort_by_key(|b| std::cmp::Reverse(b.data_size));
82 Ok(items)
83 }
84
85 pub async fn has_data(&self, dataset_id: Uuid, owner_id: Uuid) -> Result<bool, DatasetError> {
90 self.check_read_permission(owner_id, dataset_id).await?;
91 let count = self.db.count_dataset_data(dataset_id).await?;
92 Ok(count > 0)
93 }
94
95 pub async fn get_status(
101 &self,
102 dataset_ids: &[Uuid],
103 ) -> Result<HashMap<Uuid, HashMap<String, PipelineRunStatus>>, DatasetError> {
104 const PIPELINES: &[&str] = &["add_pipeline", "cognify_pipeline"];
105 let mut statuses: HashMap<Uuid, HashMap<String, PipelineRunStatus>> =
106 HashMap::with_capacity(dataset_ids.len());
107 for &id in dataset_ids {
108 for pipeline_name in PIPELINES {
109 if let Some(status) = self
110 .db
111 .get_latest_pipeline_status(pipeline_name, id)
112 .await?
113 {
114 statuses
115 .entry(id)
116 .or_default()
117 .insert(pipeline_name.to_string(), status);
118 }
119 }
120 }
121 Ok(statuses)
122 }
123
124 pub fn discover_datasets(
129 directory_path: &std::path::Path,
130 ) -> Result<Vec<String>, DatasetError> {
131 let mut datasets = Vec::new();
132 for entry in std::fs::read_dir(directory_path)? {
133 let entry = entry?;
134 if entry.file_type()?.is_dir()
135 && let Some(name) = entry.file_name().to_str()
136 {
137 datasets.push(name.to_owned());
138 }
139 }
140 Ok(datasets)
141 }
142
143 pub async fn empty_dataset(
151 &self,
152 dataset_id: Uuid,
153 owner_id: Uuid,
154 delete_service: &DeleteService,
155 ) -> Result<DeleteResult, DatasetError> {
156 let dataset = self.require_dataset(dataset_id).await?;
157 self.check_delete_permission(owner_id, dataset_id).await?;
158 let request = DeleteRequest {
159 scope: DeleteScope::Dataset {
160 owner_id,
161 dataset_name: dataset.name,
162 },
163 mode: DeleteMode::Hard,
164 memory_only: false,
165 };
166 Ok(delete_service.execute(&request).await?)
167 }
168
169 pub async fn delete_data(
173 &self,
174 dataset_id: Uuid,
175 data_id: Uuid,
176 owner_id: Uuid,
177 mode: DeleteMode,
178 delete_dataset_if_empty: bool,
179 delete_service: &DeleteService,
180 ) -> Result<DeleteResult, DatasetError> {
181 let dataset = self.require_dataset(dataset_id).await?;
182 self.check_delete_permission(owner_id, dataset_id).await?;
183 let request = DeleteRequest {
184 scope: DeleteScope::Data {
185 owner_id,
186 data_id,
187 dataset_name: Some(dataset.name),
188 delete_dataset_if_empty,
189 },
190 mode,
191 memory_only: false,
192 };
193 Ok(delete_service.execute(&request).await?)
194 }
195
196 pub async fn delete_all(
200 &self,
201 owner_id: Uuid,
202 delete_service: &DeleteService,
203 ) -> Result<Vec<DeleteResult>, DatasetError> {
204 let datasets = self.list_datasets(owner_id).await?;
205 let mut results = Vec::with_capacity(datasets.len());
206 for ds in datasets {
207 let request = DeleteRequest {
208 scope: DeleteScope::Dataset {
209 owner_id,
210 dataset_name: ds.name,
211 },
212 mode: DeleteMode::Hard,
213 memory_only: false,
214 };
215 results.push(delete_service.execute(&request).await?);
216 }
217 Ok(results)
218 }
219
220 pub async fn create_dataset(
230 &self,
231 name: &str,
232 owner_id: Uuid,
233 tenant_id: Option<Uuid>,
234 ) -> Result<Dataset, DatasetError> {
235 let id = generate_dataset_id(name, owner_id, tenant_id);
236 if let Some(existing) = self.db.get_dataset(id).await? {
238 return Ok(existing);
239 }
240 let dataset = Dataset::new(name.to_string(), owner_id, tenant_id, id);
241 Ok(self.db.create_dataset(dataset).await?)
242 }
243
244 pub async fn create_authorized_dataset(
247 &self,
248 name: &str,
249 owner_id: Uuid,
250 tenant_id: Option<Uuid>,
251 parent_user_id: Option<Uuid>,
252 ) -> Result<Dataset, DatasetError> {
253 let ds = self.create_dataset(name, owner_id, tenant_id).await?;
254 let acl = self.acl_db.as_ref().ok_or(DatasetError::AclNotConfigured)?;
255
256 acl.ensure_principal(owner_id, "user").await?;
263 for perm in DATASET_PERMISSIONS {
264 acl.grant_permission(owner_id, ds.id, perm).await?;
265 }
266 if let Some(parent) = parent_user_id
267 && parent != owner_id
268 {
269 acl.ensure_principal(parent, "user").await?;
270 for perm in DATASET_PERMISSIONS {
271 acl.grant_permission(parent, ds.id, perm).await?;
272 }
273 }
274 Ok(ds)
275 }
276
277 async fn check_read_permission(
282 &self,
283 owner_id: Uuid,
284 dataset_id: Uuid,
285 ) -> Result<(), DatasetError> {
286 if let Some(acl) = &self.acl_db
287 && !acl.has_permission(owner_id, dataset_id, "read").await?
288 {
289 return Err(DatasetError::PermissionDenied);
290 }
291 Ok(())
292 }
293
294 async fn check_delete_permission(
295 &self,
296 owner_id: Uuid,
297 dataset_id: Uuid,
298 ) -> Result<(), DatasetError> {
299 if let Some(acl) = &self.acl_db
300 && !acl.has_permission(owner_id, dataset_id, "delete").await?
301 {
302 return Err(DatasetError::PermissionDenied);
303 }
304 Ok(())
305 }
306
307 async fn require_dataset(&self, id: Uuid) -> Result<Dataset, DatasetError> {
308 self.db.get_dataset(id).await?.ok_or(DatasetError::NotFound)
309 }
310}
311
312#[cfg(test)]
313#[allow(
314 clippy::unwrap_used,
315 clippy::expect_used,
316 reason = "test code — panics are acceptable failures"
317)]
318mod tests {
319 use super::*;
320 use cognee_database::{connect, initialize};
321 use cognee_models::{Data, Dataset};
322 use uuid::Uuid;
323
324 async fn fresh_db() -> Arc<cognee_database::DatabaseConnection> {
326 let db = connect("sqlite::memory:")
327 .await
328 .expect("in-memory SQLite always connects");
329 initialize(&db)
330 .await
331 .expect("migrations succeed on empty DB");
332 Arc::new(db)
333 }
334
335 fn make_dataset(owner_id: Uuid) -> Dataset {
336 Dataset::new(
337 format!("test-dataset-{}", Uuid::new_v4()),
338 owner_id,
339 None,
340 Uuid::new_v4(),
341 )
342 }
343
344 fn make_data(owner_id: Uuid) -> Data {
345 let id = Uuid::new_v4();
346 let loc = format!("file:///tmp/test/{id}.txt");
347 Data::builder(
348 id,
349 "test-data.txt",
350 loc.as_str(),
351 loc.as_str(),
352 "txt",
353 "text/plain",
354 format!("{:x}", Uuid::new_v4()),
355 owner_id,
356 )
357 .build()
358 }
359
360 fn make_data_with_size(owner_id: Uuid, size: i64) -> Data {
361 let id = Uuid::new_v4();
362 let loc = format!("file:///tmp/test/{id}.txt");
363 Data::builder(
364 id,
365 "file.txt",
366 loc.as_str(),
367 loc.as_str(),
368 "txt",
369 "text/plain",
370 format!("{:x}", Uuid::new_v4()),
371 owner_id,
372 )
373 .data_size(size)
374 .build()
375 }
376
377 #[tokio::test]
378 async fn test_list_datasets_no_acl() {
379 let db = fresh_db().await;
380 let owner_id = Uuid::new_v4();
381 let ds = make_dataset(owner_id);
382
383 let ingest: &dyn IngestDb = db.as_ref();
385 ingest
386 .create_dataset(ds.clone())
387 .await
388 .expect("create_dataset");
389
390 let mgr = DatasetManager::new(db.clone() as Arc<dyn DatasetDb>);
391 let result = mgr.list_datasets(owner_id).await.expect("list_datasets");
392 assert_eq!(result.len(), 1);
393 assert_eq!(result[0].id, ds.id);
394 }
395
396 #[tokio::test]
397 async fn test_list_datasets_different_owner() {
398 let db = fresh_db().await;
399 let owner_a = Uuid::new_v4();
400 let owner_b = Uuid::new_v4();
401
402 let ingest: &dyn IngestDb = db.as_ref();
403 ingest
404 .create_dataset(make_dataset(owner_a))
405 .await
406 .expect("create_dataset");
407 ingest
408 .create_dataset(make_dataset(owner_b))
409 .await
410 .expect("create_dataset");
411
412 let mgr = DatasetManager::new(db.clone() as Arc<dyn DatasetDb>);
413 let result_a = mgr.list_datasets(owner_a).await.expect("list_datasets");
414 assert_eq!(result_a.len(), 1);
415 let result_b = mgr.list_datasets(owner_b).await.expect("list_datasets");
416 assert_eq!(result_b.len(), 1);
417 }
418
419 #[tokio::test]
420 async fn test_has_data_empty_dataset() {
421 let db = fresh_db().await;
422 let owner_id = Uuid::new_v4();
423 let ds = make_dataset(owner_id);
424
425 let ingest: &dyn IngestDb = db.as_ref();
426 ingest
427 .create_dataset(ds.clone())
428 .await
429 .expect("create_dataset");
430
431 let mgr = DatasetManager::new(db.clone() as Arc<dyn DatasetDb>);
432 assert!(!mgr.has_data(ds.id, owner_id).await.expect("has_data"));
433 }
434
435 #[tokio::test]
436 async fn test_has_data_with_data() {
437 let db = fresh_db().await;
438 let owner_id = Uuid::new_v4();
439 let ds = make_dataset(owner_id);
440 let data = make_data(owner_id);
441
442 let ingest: &dyn IngestDb = db.as_ref();
443 ingest
444 .create_dataset(ds.clone())
445 .await
446 .expect("create_dataset");
447 ingest.create_data(data.clone()).await.expect("create_data");
448 ingest
449 .attach_data_to_dataset(ds.id, data.id)
450 .await
451 .expect("attach_data");
452
453 let mgr = DatasetManager::new(db.clone() as Arc<dyn DatasetDb>);
454 assert!(mgr.has_data(ds.id, owner_id).await.expect("has_data"));
455 }
456
457 #[tokio::test]
458 async fn test_has_data_permission_denied_with_acl() {
459 let db = fresh_db().await;
460 let owner_id = Uuid::new_v4();
461 let other_id = Uuid::new_v4();
462 let ds = make_dataset(owner_id);
463
464 let ingest: &dyn IngestDb = db.as_ref();
465 ingest
466 .create_dataset(ds.clone())
467 .await
468 .expect("create_dataset");
469
470 let acl: Arc<dyn AclDb> = Arc::new(cognee_test_utils::MockAclDb::new());
474 acl.ensure_principal(owner_id, "user")
475 .await
476 .expect("ensure_principal");
477 acl.grant_permission(owner_id, ds.id, "read")
478 .await
479 .expect("grant_permission");
480
481 let mgr = DatasetManager::new(db.clone() as Arc<dyn DatasetDb>).with_acl(acl);
482
483 assert!(
485 mgr.has_data(ds.id, owner_id).await.is_ok(),
486 "owner must be able to call has_data"
487 );
488
489 let err = mgr
491 .has_data(ds.id, other_id)
492 .await
493 .expect_err("must fail for unauthorized user");
494 assert!(
495 matches!(err, DatasetError::PermissionDenied),
496 "expected PermissionDenied, got {err:?}"
497 );
498 }
499
500 #[tokio::test]
501 async fn test_list_data() {
502 let db = fresh_db().await;
503 let owner_id = Uuid::new_v4();
504 let ds = make_dataset(owner_id);
505 let data = make_data(owner_id);
506
507 let ingest: &dyn IngestDb = db.as_ref();
508 ingest
509 .create_dataset(ds.clone())
510 .await
511 .expect("create_dataset");
512 ingest.create_data(data.clone()).await.expect("create_data");
513 ingest
514 .attach_data_to_dataset(ds.id, data.id)
515 .await
516 .expect("attach_data");
517
518 let mgr = DatasetManager::new(db.clone() as Arc<dyn DatasetDb>);
519 let items = mgr.list_data(ds.id, owner_id).await.expect("list_data");
520 assert_eq!(items.len(), 1);
521 assert_eq!(items[0].id, data.id);
522 }
523
524 #[tokio::test]
525 async fn test_list_data_sorted_by_size_descending() {
526 let db = fresh_db().await;
527 let owner_id = Uuid::new_v4();
528 let ds = make_dataset(owner_id);
529
530 let ingest: &dyn IngestDb = db.as_ref();
531 ingest
532 .create_dataset(ds.clone())
533 .await
534 .expect("create_dataset");
535
536 let small = make_data_with_size(owner_id, 10);
538 let large = make_data_with_size(owner_id, 1000);
539 let medium = make_data_with_size(owner_id, 500);
540
541 for d in [&small, &large, &medium] {
542 ingest.create_data(d.clone()).await.expect("create_data");
543 ingest
544 .attach_data_to_dataset(ds.id, d.id)
545 .await
546 .expect("attach_data");
547 }
548
549 let mgr = DatasetManager::new(db.clone() as Arc<dyn DatasetDb>);
550 let items = mgr.list_data(ds.id, owner_id).await.expect("list_data");
551 assert_eq!(items.len(), 3);
552 assert_eq!(items[0].id, large.id, "largest must come first");
554 assert_eq!(items[1].id, medium.id, "medium second");
555 assert_eq!(items[2].id, small.id, "smallest last");
556 }
557
558 #[tokio::test]
559 async fn test_get_status_no_runs() {
560 let db = fresh_db().await;
561 let owner_id = Uuid::new_v4();
562 let ds = make_dataset(owner_id);
563
564 let ingest: &dyn IngestDb = db.as_ref();
565 ingest
566 .create_dataset(ds.clone())
567 .await
568 .expect("create_dataset");
569
570 let mgr = DatasetManager::new(db.clone() as Arc<dyn DatasetDb>);
571 let statuses = mgr.get_status(&[ds.id]).await.expect("get_status");
572 assert!(statuses.is_empty());
574 }
575
576 #[tokio::test]
577 async fn test_discover_datasets() {
578 let tmpdir = tempfile::tempdir().expect("create temp dir");
579 std::fs::create_dir(tmpdir.path().join("dataset-a")).expect("create dir");
580 std::fs::create_dir(tmpdir.path().join("dataset-b")).expect("create dir");
581 std::fs::write(tmpdir.path().join("not-a-dataset.txt"), "hello").expect("create file");
583
584 let mut result =
585 DatasetManager::discover_datasets(tmpdir.path()).expect("discover_datasets");
586 result.sort();
587 assert_eq!(result, vec!["dataset-a", "dataset-b"]);
588 }
589
590 #[tokio::test]
591 async fn test_require_dataset_not_found() {
592 let db = fresh_db().await;
593 let mgr = DatasetManager::new(db as Arc<dyn DatasetDb>);
594 let err = mgr.require_dataset(Uuid::new_v4()).await;
595 assert!(matches!(err, Err(DatasetError::NotFound)));
596 }
597
598 #[tokio::test]
599 async fn test_create_dataset_deterministic_id() {
600 let db = fresh_db().await;
601 let owner_id = Uuid::new_v4();
602 let tenant_id = Some(Uuid::new_v4());
603 let mgr = DatasetManager::new(db.clone() as Arc<dyn DatasetDb>);
604
605 let ds = mgr
606 .create_dataset("my-ds", owner_id, tenant_id)
607 .await
608 .expect("create_dataset");
609
610 let expected_id = generate_dataset_id("my-ds", owner_id, tenant_id);
611 assert_eq!(ds.id, expected_id, "ID must match generate_dataset_id");
612 assert_eq!(ds.name, "my-ds");
613 assert_eq!(ds.owner_id, owner_id);
614 }
615
616 #[tokio::test]
617 async fn test_create_dataset_idempotent() {
618 let db = fresh_db().await;
619 let owner_id = Uuid::new_v4();
620 let mgr = DatasetManager::new(db.clone() as Arc<dyn DatasetDb>);
621
622 let ds1 = mgr
623 .create_dataset("dup-ds", owner_id, None)
624 .await
625 .expect("first create");
626 let ds2 = mgr
627 .create_dataset("dup-ds", owner_id, None)
628 .await
629 .expect("second create");
630
631 assert_eq!(ds1.id, ds2.id, "Idempotent: same ID returned on duplicate");
632
633 let list = IngestDb::list_datasets_by_owner(db.as_ref(), owner_id)
635 .await
636 .unwrap();
637 assert_eq!(list.len(), 1, "Only one row should exist");
638 }
639
640 #[tokio::test]
641 async fn test_create_authorized_dataset_without_acl_errors() {
642 let db = fresh_db().await;
643 let owner_id = Uuid::new_v4();
644 let mgr = DatasetManager::new(db as Arc<dyn DatasetDb>);
645
646 let result = mgr
647 .create_authorized_dataset("auth-ds", owner_id, None, None)
648 .await;
649 assert!(
650 matches!(result, Err(DatasetError::AclNotConfigured)),
651 "Should error when ACL not configured"
652 );
653 }
654
655 #[tokio::test]
656 async fn test_create_authorized_dataset_grants_four_permissions() {
657 let db = fresh_db().await;
658 let owner_id = Uuid::new_v4();
659 let parent_id = Uuid::new_v4();
660 let acl: Arc<dyn AclDb> = Arc::new(cognee_test_utils::MockAclDb::new());
661 let mgr = DatasetManager::new(db.clone() as Arc<dyn DatasetDb>).with_acl(acl.clone());
662
663 let ds = mgr
664 .create_authorized_dataset("auth-ds", owner_id, None, Some(parent_id))
665 .await
666 .expect("create_authorized_dataset");
667
668 for perm in DATASET_PERMISSIONS {
670 assert!(
671 acl.has_permission(owner_id, ds.id, perm).await.unwrap(),
672 "owner must have '{perm}'"
673 );
674 assert!(
675 acl.has_permission(parent_id, ds.id, perm).await.unwrap(),
676 "parent must have '{perm}'"
677 );
678 }
679 }
680
681 #[tokio::test]
682 async fn test_create_authorized_dataset_parent_equals_owner_no_duplicate() {
683 let db = fresh_db().await;
684 let owner_id = Uuid::new_v4();
685 let acl: Arc<dyn AclDb> = Arc::new(cognee_test_utils::MockAclDb::new());
686 let mgr = DatasetManager::new(db.clone() as Arc<dyn DatasetDb>).with_acl(acl.clone());
687
688 let ds = mgr
690 .create_authorized_dataset("auth-ds-self", owner_id, None, Some(owner_id))
691 .await
692 .expect("create_authorized_dataset with self-parent should succeed");
693
694 for perm in DATASET_PERMISSIONS {
695 assert!(
696 acl.has_permission(owner_id, ds.id, perm).await.unwrap(),
697 "owner must have '{perm}'"
698 );
699 }
700 }
701}