use std::collections::HashMap;
use std::sync::Arc;
use cognee_database::{AclDb, DeleteDb, IngestDb, PipelineRunStatus};
use cognee_delete::{DeleteMode, DeleteRequest, DeleteResult, DeleteScope, DeleteService};
use cognee_ingestion::generate_dataset_id;
use cognee_models::{Data, Dataset};
use uuid::Uuid;
use super::error::DatasetError;
const DATASET_PERMISSIONS: [&str; 4] = ["read", "write", "delete", "share"];
pub trait DatasetDb: IngestDb + DeleteDb + Send + Sync {}
impl<T: IngestDb + DeleteDb + Send + Sync> DatasetDb for T {}
pub struct DatasetManager {
db: Arc<dyn DatasetDb>,
acl_db: Option<Arc<dyn AclDb>>,
}
impl DatasetManager {
pub fn new(db: Arc<dyn DatasetDb>) -> Self {
Self { db, acl_db: None }
}
pub fn with_acl(mut self, acl_db: Arc<dyn AclDb>) -> Self {
self.acl_db = Some(acl_db);
self
}
pub async fn list_datasets(&self, owner_id: Uuid) -> Result<Vec<Dataset>, DatasetError> {
if let Some(acl) = &self.acl_db {
let authorized_ids = acl.authorized_dataset_ids(owner_id, "read").await?;
let mut datasets = Vec::with_capacity(authorized_ids.len());
for id in authorized_ids {
if let Some(ds) = self.db.get_dataset(id).await? {
datasets.push(ds);
}
}
Ok(datasets)
} else {
Ok(IngestDb::list_datasets_by_owner(self.db.as_ref(), owner_id).await?)
}
}
pub async fn list_data(
&self,
dataset_id: Uuid,
owner_id: Uuid,
) -> Result<Vec<Data>, DatasetError> {
self.check_read_permission(owner_id, dataset_id).await?;
let mut items = self.db.get_dataset_data(dataset_id).await?;
items.sort_by_key(|b| std::cmp::Reverse(b.data_size));
Ok(items)
}
pub async fn has_data(&self, dataset_id: Uuid, owner_id: Uuid) -> Result<bool, DatasetError> {
self.check_read_permission(owner_id, dataset_id).await?;
let count = self.db.count_dataset_data(dataset_id).await?;
Ok(count > 0)
}
pub async fn get_status(
&self,
dataset_ids: &[Uuid],
) -> Result<HashMap<Uuid, HashMap<String, PipelineRunStatus>>, DatasetError> {
const PIPELINES: &[&str] = &["add_pipeline", "cognify_pipeline"];
let mut statuses: HashMap<Uuid, HashMap<String, PipelineRunStatus>> =
HashMap::with_capacity(dataset_ids.len());
for &id in dataset_ids {
for pipeline_name in PIPELINES {
if let Some(status) = self
.db
.get_latest_pipeline_status(pipeline_name, id)
.await?
{
statuses
.entry(id)
.or_default()
.insert(pipeline_name.to_string(), status);
}
}
}
Ok(statuses)
}
pub fn discover_datasets(
directory_path: &std::path::Path,
) -> Result<Vec<String>, DatasetError> {
let mut datasets = Vec::new();
for entry in std::fs::read_dir(directory_path)? {
let entry = entry?;
if entry.file_type()?.is_dir()
&& let Some(name) = entry.file_name().to_str()
{
datasets.push(name.to_owned());
}
}
Ok(datasets)
}
pub async fn empty_dataset(
&self,
dataset_id: Uuid,
owner_id: Uuid,
delete_service: &DeleteService,
) -> Result<DeleteResult, DatasetError> {
let dataset = self.require_dataset(dataset_id).await?;
self.check_delete_permission(owner_id, dataset_id).await?;
let request = DeleteRequest {
scope: DeleteScope::Dataset {
owner_id,
dataset_name: dataset.name,
},
mode: DeleteMode::Hard,
memory_only: false,
};
Ok(delete_service.execute(&request).await?)
}
pub async fn delete_data(
&self,
dataset_id: Uuid,
data_id: Uuid,
owner_id: Uuid,
mode: DeleteMode,
delete_dataset_if_empty: bool,
delete_service: &DeleteService,
) -> Result<DeleteResult, DatasetError> {
let dataset = self.require_dataset(dataset_id).await?;
self.check_delete_permission(owner_id, dataset_id).await?;
let request = DeleteRequest {
scope: DeleteScope::Data {
owner_id,
data_id,
dataset_name: Some(dataset.name),
delete_dataset_if_empty,
},
mode,
memory_only: false,
};
Ok(delete_service.execute(&request).await?)
}
pub async fn delete_all(
&self,
owner_id: Uuid,
delete_service: &DeleteService,
) -> Result<Vec<DeleteResult>, DatasetError> {
let datasets = self.list_datasets(owner_id).await?;
let mut results = Vec::with_capacity(datasets.len());
for ds in datasets {
let request = DeleteRequest {
scope: DeleteScope::Dataset {
owner_id,
dataset_name: ds.name,
},
mode: DeleteMode::Hard,
memory_only: false,
};
results.push(delete_service.execute(&request).await?);
}
Ok(results)
}
pub async fn create_dataset(
&self,
name: &str,
owner_id: Uuid,
tenant_id: Option<Uuid>,
) -> Result<Dataset, DatasetError> {
let id = generate_dataset_id(name, owner_id, tenant_id);
if let Some(existing) = self.db.get_dataset(id).await? {
return Ok(existing);
}
let dataset = Dataset::new(name.to_string(), owner_id, tenant_id, id);
Ok(self.db.create_dataset(dataset).await?)
}
pub async fn create_authorized_dataset(
&self,
name: &str,
owner_id: Uuid,
tenant_id: Option<Uuid>,
parent_user_id: Option<Uuid>,
) -> Result<Dataset, DatasetError> {
let ds = self.create_dataset(name, owner_id, tenant_id).await?;
let acl = self.acl_db.as_ref().ok_or(DatasetError::AclNotConfigured)?;
acl.ensure_principal(owner_id, "user").await?;
for perm in DATASET_PERMISSIONS {
acl.grant_permission(owner_id, ds.id, perm).await?;
}
if let Some(parent) = parent_user_id
&& parent != owner_id
{
acl.ensure_principal(parent, "user").await?;
for perm in DATASET_PERMISSIONS {
acl.grant_permission(parent, ds.id, perm).await?;
}
}
Ok(ds)
}
async fn check_read_permission(
&self,
owner_id: Uuid,
dataset_id: Uuid,
) -> Result<(), DatasetError> {
if let Some(acl) = &self.acl_db
&& !acl.has_permission(owner_id, dataset_id, "read").await?
{
return Err(DatasetError::PermissionDenied);
}
Ok(())
}
async fn check_delete_permission(
&self,
owner_id: Uuid,
dataset_id: Uuid,
) -> Result<(), DatasetError> {
if let Some(acl) = &self.acl_db
&& !acl.has_permission(owner_id, dataset_id, "delete").await?
{
return Err(DatasetError::PermissionDenied);
}
Ok(())
}
async fn require_dataset(&self, id: Uuid) -> Result<Dataset, DatasetError> {
self.db.get_dataset(id).await?.ok_or(DatasetError::NotFound)
}
}
#[cfg(test)]
#[allow(
clippy::unwrap_used,
clippy::expect_used,
reason = "test code — panics are acceptable failures"
)]
mod tests {
use super::*;
use cognee_database::{connect, initialize};
use cognee_models::{Data, Dataset};
use uuid::Uuid;
async fn fresh_db() -> Arc<cognee_database::DatabaseConnection> {
let db = connect("sqlite::memory:")
.await
.expect("in-memory SQLite always connects");
initialize(&db)
.await
.expect("migrations succeed on empty DB");
Arc::new(db)
}
fn make_dataset(owner_id: Uuid) -> Dataset {
Dataset::new(
format!("test-dataset-{}", Uuid::new_v4()),
owner_id,
None,
Uuid::new_v4(),
)
}
fn make_data(owner_id: Uuid) -> Data {
let id = Uuid::new_v4();
let loc = format!("file:///tmp/test/{id}.txt");
Data::builder(
id,
"test-data.txt",
loc.as_str(),
loc.as_str(),
"txt",
"text/plain",
format!("{:x}", Uuid::new_v4()),
owner_id,
)
.build()
}
fn make_data_with_size(owner_id: Uuid, size: i64) -> Data {
let id = Uuid::new_v4();
let loc = format!("file:///tmp/test/{id}.txt");
Data::builder(
id,
"file.txt",
loc.as_str(),
loc.as_str(),
"txt",
"text/plain",
format!("{:x}", Uuid::new_v4()),
owner_id,
)
.data_size(size)
.build()
}
#[tokio::test]
async fn test_list_datasets_no_acl() {
let db = fresh_db().await;
let owner_id = Uuid::new_v4();
let ds = make_dataset(owner_id);
let ingest: &dyn IngestDb = db.as_ref();
ingest
.create_dataset(ds.clone())
.await
.expect("create_dataset");
let mgr = DatasetManager::new(db.clone() as Arc<dyn DatasetDb>);
let result = mgr.list_datasets(owner_id).await.expect("list_datasets");
assert_eq!(result.len(), 1);
assert_eq!(result[0].id, ds.id);
}
#[tokio::test]
async fn test_list_datasets_different_owner() {
let db = fresh_db().await;
let owner_a = Uuid::new_v4();
let owner_b = Uuid::new_v4();
let ingest: &dyn IngestDb = db.as_ref();
ingest
.create_dataset(make_dataset(owner_a))
.await
.expect("create_dataset");
ingest
.create_dataset(make_dataset(owner_b))
.await
.expect("create_dataset");
let mgr = DatasetManager::new(db.clone() as Arc<dyn DatasetDb>);
let result_a = mgr.list_datasets(owner_a).await.expect("list_datasets");
assert_eq!(result_a.len(), 1);
let result_b = mgr.list_datasets(owner_b).await.expect("list_datasets");
assert_eq!(result_b.len(), 1);
}
#[tokio::test]
async fn test_has_data_empty_dataset() {
let db = fresh_db().await;
let owner_id = Uuid::new_v4();
let ds = make_dataset(owner_id);
let ingest: &dyn IngestDb = db.as_ref();
ingest
.create_dataset(ds.clone())
.await
.expect("create_dataset");
let mgr = DatasetManager::new(db.clone() as Arc<dyn DatasetDb>);
assert!(!mgr.has_data(ds.id, owner_id).await.expect("has_data"));
}
#[tokio::test]
async fn test_has_data_with_data() {
let db = fresh_db().await;
let owner_id = Uuid::new_v4();
let ds = make_dataset(owner_id);
let data = make_data(owner_id);
let ingest: &dyn IngestDb = db.as_ref();
ingest
.create_dataset(ds.clone())
.await
.expect("create_dataset");
ingest.create_data(data.clone()).await.expect("create_data");
ingest
.attach_data_to_dataset(ds.id, data.id)
.await
.expect("attach_data");
let mgr = DatasetManager::new(db.clone() as Arc<dyn DatasetDb>);
assert!(mgr.has_data(ds.id, owner_id).await.expect("has_data"));
}
#[tokio::test]
async fn test_has_data_permission_denied_with_acl() {
let db = fresh_db().await;
let owner_id = Uuid::new_v4();
let other_id = Uuid::new_v4();
let ds = make_dataset(owner_id);
let ingest: &dyn IngestDb = db.as_ref();
ingest
.create_dataset(ds.clone())
.await
.expect("create_dataset");
let acl: Arc<dyn AclDb> = Arc::new(cognee_test_utils::MockAclDb::new());
acl.ensure_principal(owner_id, "user")
.await
.expect("ensure_principal");
acl.grant_permission(owner_id, ds.id, "read")
.await
.expect("grant_permission");
let mgr = DatasetManager::new(db.clone() as Arc<dyn DatasetDb>).with_acl(acl);
assert!(
mgr.has_data(ds.id, owner_id).await.is_ok(),
"owner must be able to call has_data"
);
let err = mgr
.has_data(ds.id, other_id)
.await
.expect_err("must fail for unauthorized user");
assert!(
matches!(err, DatasetError::PermissionDenied),
"expected PermissionDenied, got {err:?}"
);
}
#[tokio::test]
async fn test_list_data() {
let db = fresh_db().await;
let owner_id = Uuid::new_v4();
let ds = make_dataset(owner_id);
let data = make_data(owner_id);
let ingest: &dyn IngestDb = db.as_ref();
ingest
.create_dataset(ds.clone())
.await
.expect("create_dataset");
ingest.create_data(data.clone()).await.expect("create_data");
ingest
.attach_data_to_dataset(ds.id, data.id)
.await
.expect("attach_data");
let mgr = DatasetManager::new(db.clone() as Arc<dyn DatasetDb>);
let items = mgr.list_data(ds.id, owner_id).await.expect("list_data");
assert_eq!(items.len(), 1);
assert_eq!(items[0].id, data.id);
}
#[tokio::test]
async fn test_list_data_sorted_by_size_descending() {
let db = fresh_db().await;
let owner_id = Uuid::new_v4();
let ds = make_dataset(owner_id);
let ingest: &dyn IngestDb = db.as_ref();
ingest
.create_dataset(ds.clone())
.await
.expect("create_dataset");
let small = make_data_with_size(owner_id, 10);
let large = make_data_with_size(owner_id, 1000);
let medium = make_data_with_size(owner_id, 500);
for d in [&small, &large, &medium] {
ingest.create_data(d.clone()).await.expect("create_data");
ingest
.attach_data_to_dataset(ds.id, d.id)
.await
.expect("attach_data");
}
let mgr = DatasetManager::new(db.clone() as Arc<dyn DatasetDb>);
let items = mgr.list_data(ds.id, owner_id).await.expect("list_data");
assert_eq!(items.len(), 3);
assert_eq!(items[0].id, large.id, "largest must come first");
assert_eq!(items[1].id, medium.id, "medium second");
assert_eq!(items[2].id, small.id, "smallest last");
}
#[tokio::test]
async fn test_get_status_no_runs() {
let db = fresh_db().await;
let owner_id = Uuid::new_v4();
let ds = make_dataset(owner_id);
let ingest: &dyn IngestDb = db.as_ref();
ingest
.create_dataset(ds.clone())
.await
.expect("create_dataset");
let mgr = DatasetManager::new(db.clone() as Arc<dyn DatasetDb>);
let statuses = mgr.get_status(&[ds.id]).await.expect("get_status");
assert!(statuses.is_empty());
}
#[tokio::test]
async fn test_discover_datasets() {
let tmpdir = tempfile::tempdir().expect("create temp dir");
std::fs::create_dir(tmpdir.path().join("dataset-a")).expect("create dir");
std::fs::create_dir(tmpdir.path().join("dataset-b")).expect("create dir");
std::fs::write(tmpdir.path().join("not-a-dataset.txt"), "hello").expect("create file");
let mut result =
DatasetManager::discover_datasets(tmpdir.path()).expect("discover_datasets");
result.sort();
assert_eq!(result, vec!["dataset-a", "dataset-b"]);
}
#[tokio::test]
async fn test_require_dataset_not_found() {
let db = fresh_db().await;
let mgr = DatasetManager::new(db as Arc<dyn DatasetDb>);
let err = mgr.require_dataset(Uuid::new_v4()).await;
assert!(matches!(err, Err(DatasetError::NotFound)));
}
#[tokio::test]
async fn test_create_dataset_deterministic_id() {
let db = fresh_db().await;
let owner_id = Uuid::new_v4();
let tenant_id = Some(Uuid::new_v4());
let mgr = DatasetManager::new(db.clone() as Arc<dyn DatasetDb>);
let ds = mgr
.create_dataset("my-ds", owner_id, tenant_id)
.await
.expect("create_dataset");
let expected_id = generate_dataset_id("my-ds", owner_id, tenant_id);
assert_eq!(ds.id, expected_id, "ID must match generate_dataset_id");
assert_eq!(ds.name, "my-ds");
assert_eq!(ds.owner_id, owner_id);
}
#[tokio::test]
async fn test_create_dataset_idempotent() {
let db = fresh_db().await;
let owner_id = Uuid::new_v4();
let mgr = DatasetManager::new(db.clone() as Arc<dyn DatasetDb>);
let ds1 = mgr
.create_dataset("dup-ds", owner_id, None)
.await
.expect("first create");
let ds2 = mgr
.create_dataset("dup-ds", owner_id, None)
.await
.expect("second create");
assert_eq!(ds1.id, ds2.id, "Idempotent: same ID returned on duplicate");
let list = IngestDb::list_datasets_by_owner(db.as_ref(), owner_id)
.await
.unwrap();
assert_eq!(list.len(), 1, "Only one row should exist");
}
#[tokio::test]
async fn test_create_authorized_dataset_without_acl_errors() {
let db = fresh_db().await;
let owner_id = Uuid::new_v4();
let mgr = DatasetManager::new(db as Arc<dyn DatasetDb>);
let result = mgr
.create_authorized_dataset("auth-ds", owner_id, None, None)
.await;
assert!(
matches!(result, Err(DatasetError::AclNotConfigured)),
"Should error when ACL not configured"
);
}
#[tokio::test]
async fn test_create_authorized_dataset_grants_four_permissions() {
let db = fresh_db().await;
let owner_id = Uuid::new_v4();
let parent_id = Uuid::new_v4();
let acl: Arc<dyn AclDb> = Arc::new(cognee_test_utils::MockAclDb::new());
let mgr = DatasetManager::new(db.clone() as Arc<dyn DatasetDb>).with_acl(acl.clone());
let ds = mgr
.create_authorized_dataset("auth-ds", owner_id, None, Some(parent_id))
.await
.expect("create_authorized_dataset");
for perm in DATASET_PERMISSIONS {
assert!(
acl.has_permission(owner_id, ds.id, perm).await.unwrap(),
"owner must have '{perm}'"
);
assert!(
acl.has_permission(parent_id, ds.id, perm).await.unwrap(),
"parent must have '{perm}'"
);
}
}
#[tokio::test]
async fn test_create_authorized_dataset_parent_equals_owner_no_duplicate() {
let db = fresh_db().await;
let owner_id = Uuid::new_v4();
let acl: Arc<dyn AclDb> = Arc::new(cognee_test_utils::MockAclDb::new());
let mgr = DatasetManager::new(db.clone() as Arc<dyn DatasetDb>).with_acl(acl.clone());
let ds = mgr
.create_authorized_dataset("auth-ds-self", owner_id, None, Some(owner_id))
.await
.expect("create_authorized_dataset with self-parent should succeed");
for perm in DATASET_PERMISSIONS {
assert!(
acl.has_permission(owner_id, ds.id, perm).await.unwrap(),
"owner must have '{perm}'"
);
}
}
}