Skip to main content

cognee_database/traits/
ingest_db.rs

1use async_trait::async_trait;
2use chrono::{DateTime, Utc};
3use cognee_models::{Data, Dataset};
4use sea_orm::DatabaseConnection;
5use uuid::Uuid;
6
7use crate::ops::{data, datasets, pipeline_runs};
8use crate::types::{DatabaseError, PipelineRunStatus};
9
10#[async_trait]
11pub trait IngestDb: Send + Sync {
12    async fn get_dataset_by_name(
13        &self,
14        name: &str,
15        owner_id: Uuid,
16        tenant_id: Option<Uuid>,
17    ) -> Result<Option<Dataset>, DatabaseError>;
18
19    /// Look up a dataset by its UUID.
20    async fn get_dataset(&self, id: Uuid) -> Result<Option<Dataset>, DatabaseError>;
21
22    async fn create_dataset(&self, dataset: Dataset) -> Result<Dataset, DatabaseError>;
23
24    /// List all datasets owned by the given user.
25    async fn list_datasets_by_owner(&self, owner_id: Uuid) -> Result<Vec<Dataset>, DatabaseError>;
26
27    async fn get_data(&self, id: Uuid) -> Result<Option<Data>, DatabaseError>;
28
29    async fn create_data(&self, d: Data) -> Result<Data, DatabaseError>;
30
31    async fn attach_data_to_dataset(
32        &self,
33        dataset_id: Uuid,
34        data_id: Uuid,
35    ) -> Result<(), DatabaseError>;
36
37    /// Update the `last_accessed` timestamp on the given Data records.
38    ///
39    /// Implementations should perform a bulk `UPDATE data SET last_accessed = ?
40    /// WHERE id IN (...)` query. An empty `data_ids` slice is a no-op.
41    async fn update_last_accessed(
42        &self,
43        data_ids: &[Uuid],
44        timestamp: DateTime<Utc>,
45    ) -> Result<(), DatabaseError>;
46
47    /// Get the latest pipeline run status for a (pipeline_name, dataset_id) pair.
48    ///
49    /// Returns `None` if no matching run exists.
50    async fn get_latest_pipeline_status(
51        &self,
52        pipeline_name: &str,
53        dataset_id: Uuid,
54    ) -> Result<Option<PipelineRunStatus>, DatabaseError>;
55}
56
57#[async_trait]
58impl IngestDb for DatabaseConnection {
59    async fn get_dataset_by_name(
60        &self,
61        name: &str,
62        owner_id: Uuid,
63        tenant_id: Option<Uuid>,
64    ) -> Result<Option<Dataset>, DatabaseError> {
65        datasets::get_dataset_by_name(self, name, owner_id, tenant_id).await
66    }
67
68    async fn get_dataset(&self, id: Uuid) -> Result<Option<Dataset>, DatabaseError> {
69        datasets::get_dataset(self, id).await
70    }
71
72    async fn create_dataset(&self, dataset: Dataset) -> Result<Dataset, DatabaseError> {
73        datasets::create_dataset(self, dataset).await
74    }
75
76    async fn list_datasets_by_owner(&self, owner_id: Uuid) -> Result<Vec<Dataset>, DatabaseError> {
77        datasets::list_datasets_by_owner(self, owner_id).await
78    }
79
80    async fn get_data(&self, id: Uuid) -> Result<Option<Data>, DatabaseError> {
81        data::get_data(self, id).await
82    }
83
84    async fn create_data(&self, d: Data) -> Result<Data, DatabaseError> {
85        data::create_data(self, d).await
86    }
87
88    async fn attach_data_to_dataset(
89        &self,
90        dataset_id: Uuid,
91        data_id: Uuid,
92    ) -> Result<(), DatabaseError> {
93        datasets::attach_data_to_dataset(self, dataset_id, data_id).await
94    }
95
96    async fn update_last_accessed(
97        &self,
98        data_ids: &[Uuid],
99        timestamp: DateTime<Utc>,
100    ) -> Result<(), DatabaseError> {
101        data::update_last_accessed(self, data_ids, timestamp).await
102    }
103
104    async fn get_latest_pipeline_status(
105        &self,
106        pipeline_name: &str,
107        dataset_id: Uuid,
108    ) -> Result<Option<PipelineRunStatus>, DatabaseError> {
109        pipeline_runs::get_latest_pipeline_status(self, pipeline_name, dataset_id).await
110    }
111}