cognee_database/traits/
ingest_db.rs1use async_trait::async_trait;
2use chrono::{DateTime, Utc};
3use cognee_models::{Data, Dataset};
4use sea_orm::DatabaseConnection;
5use uuid::Uuid;
6
7use crate::ops::{data, datasets, pipeline_runs};
8use crate::types::{DatabaseError, PipelineRunStatus};
9
10#[async_trait]
11pub trait IngestDb: Send + Sync {
12 async fn get_dataset_by_name(
13 &self,
14 name: &str,
15 owner_id: Uuid,
16 tenant_id: Option<Uuid>,
17 ) -> Result<Option<Dataset>, DatabaseError>;
18
19 async fn get_dataset(&self, id: Uuid) -> Result<Option<Dataset>, DatabaseError>;
21
22 async fn create_dataset(&self, dataset: Dataset) -> Result<Dataset, DatabaseError>;
23
24 async fn list_datasets_by_owner(&self, owner_id: Uuid) -> Result<Vec<Dataset>, DatabaseError>;
26
27 async fn get_data(&self, id: Uuid) -> Result<Option<Data>, DatabaseError>;
28
29 async fn create_data(&self, d: Data) -> Result<Data, DatabaseError>;
30
31 async fn attach_data_to_dataset(
32 &self,
33 dataset_id: Uuid,
34 data_id: Uuid,
35 ) -> Result<(), DatabaseError>;
36
37 async fn update_last_accessed(
42 &self,
43 data_ids: &[Uuid],
44 timestamp: DateTime<Utc>,
45 ) -> Result<(), DatabaseError>;
46
47 async fn get_latest_pipeline_status(
51 &self,
52 pipeline_name: &str,
53 dataset_id: Uuid,
54 ) -> Result<Option<PipelineRunStatus>, DatabaseError>;
55}
56
57#[async_trait]
58impl IngestDb for DatabaseConnection {
59 async fn get_dataset_by_name(
60 &self,
61 name: &str,
62 owner_id: Uuid,
63 tenant_id: Option<Uuid>,
64 ) -> Result<Option<Dataset>, DatabaseError> {
65 datasets::get_dataset_by_name(self, name, owner_id, tenant_id).await
66 }
67
68 async fn get_dataset(&self, id: Uuid) -> Result<Option<Dataset>, DatabaseError> {
69 datasets::get_dataset(self, id).await
70 }
71
72 async fn create_dataset(&self, dataset: Dataset) -> Result<Dataset, DatabaseError> {
73 datasets::create_dataset(self, dataset).await
74 }
75
76 async fn list_datasets_by_owner(&self, owner_id: Uuid) -> Result<Vec<Dataset>, DatabaseError> {
77 datasets::list_datasets_by_owner(self, owner_id).await
78 }
79
80 async fn get_data(&self, id: Uuid) -> Result<Option<Data>, DatabaseError> {
81 data::get_data(self, id).await
82 }
83
84 async fn create_data(&self, d: Data) -> Result<Data, DatabaseError> {
85 data::create_data(self, d).await
86 }
87
88 async fn attach_data_to_dataset(
89 &self,
90 dataset_id: Uuid,
91 data_id: Uuid,
92 ) -> Result<(), DatabaseError> {
93 datasets::attach_data_to_dataset(self, dataset_id, data_id).await
94 }
95
96 async fn update_last_accessed(
97 &self,
98 data_ids: &[Uuid],
99 timestamp: DateTime<Utc>,
100 ) -> Result<(), DatabaseError> {
101 data::update_last_accessed(self, data_ids, timestamp).await
102 }
103
104 async fn get_latest_pipeline_status(
105 &self,
106 pipeline_name: &str,
107 dataset_id: Uuid,
108 ) -> Result<Option<PipelineRunStatus>, DatabaseError> {
109 pipeline_runs::get_latest_pipeline_status(self, pipeline_name, dataset_id).await
110 }
111}