Skip to main content

cognee_database/
conversions.rs

1// UUID hex conversions from database bytes always produce valid UUIDs — any
2// failure here indicates database corruption, not a recoverable condition.
3#![allow(
4    clippy::expect_used,
5    reason = "UUID hex round-trips from DB are guaranteed by schema; failure indicates corruption"
6)]
7
8use crate::entities::{
9    data, dataset, dataset_data, edge, graph_metrics, node, pipeline_run, query, result_log,
10    task_run,
11};
12use crate::types::{
13    DatabaseError, GraphEdge, GraphMetrics, GraphNode, PipelineRun, PipelineRunStatus,
14    SearchHistoryEntry, SearchHistoryEntryType, TaskRun,
15};
16use crate::uuid_hex;
17/// Shared SeaORM ↔ domain-type conversions and error helpers used across ops modules.
18use chrono::Utc;
19use cognee_models::{Data, Dataset};
20use sea_orm::ActiveValue::Set;
21
22// ---------------------------------------------------------------------------
23// Error mapping
24// ---------------------------------------------------------------------------
25
26pub(crate) fn map_sea_err(e: sea_orm::DbErr) -> DatabaseError {
27    match &e {
28        sea_orm::DbErr::RecordNotFound(_) => DatabaseError::NotFound(e.to_string()),
29        #[cfg(any(feature = "sqlite", feature = "postgres"))]
30        sea_orm::DbErr::Exec(sea_orm::RuntimeErr::SqlxError(sqlx_err)) => {
31            let s = sqlx_err.to_string();
32            if s.contains("UNIQUE constraint failed") || s.contains("unique constraint") {
33                DatabaseError::UniqueViolation(s)
34            } else {
35                DatabaseError::QueryError(s)
36            }
37        }
38        _ => DatabaseError::QueryError(e.to_string()),
39    }
40}
41
42/// SeaORM raises an error when on_conflict do_nothing finds a duplicate.
43/// This helper treats that as a no-op success.
44pub(crate) fn ignore_do_nothing(result: Result<(), DatabaseError>) -> Result<(), DatabaseError> {
45    match result {
46        Err(DatabaseError::QueryError(ref msg))
47            if msg.contains("None of the records are inserted") =>
48        {
49            Ok(())
50        }
51        other => other,
52    }
53}
54
55// ---------------------------------------------------------------------------
56// Dataset conversions
57// ---------------------------------------------------------------------------
58
59impl From<dataset::Model> for Dataset {
60    fn from(m: dataset::Model) -> Self {
61        Self {
62            id: uuid_hex::from_hex(&m.id).expect("DB stores only valid UUID hex strings; corruption indicates data integrity failure"),
63            name: m.name,
64            owner_id: uuid_hex::from_hex(&m.owner_id).expect("DB stores only valid UUID hex strings; corruption indicates data integrity failure"),
65            tenant_id: uuid_hex::from_hex_opt(m.tenant_id.as_deref()).expect("DB stores only valid UUID hex strings; corruption indicates data integrity failure"),
66            created_at: m.created_at,
67            updated_at: m.updated_at,
68        }
69    }
70}
71
72impl From<&Dataset> for dataset::ActiveModel {
73    fn from(d: &Dataset) -> Self {
74        Self {
75            id: Set(uuid_hex::to_hex(d.id)),
76            name: Set(d.name.clone()),
77            owner_id: Set(uuid_hex::to_hex(d.owner_id)),
78            tenant_id: Set(uuid_hex::to_hex_opt(d.tenant_id)),
79            created_at: Set(d.created_at),
80            updated_at: Set(d.updated_at),
81        }
82    }
83}
84
85// ---------------------------------------------------------------------------
86// Data conversions
87// ---------------------------------------------------------------------------
88
89impl From<data::Model> for Data {
90    fn from(m: data::Model) -> Self {
91        Self {
92            id: uuid_hex::from_hex(&m.id).expect("DB stores only valid UUID hex strings; corruption indicates data integrity failure"),
93            name: m.name,
94            raw_data_location: m.raw_data_location,
95            original_data_location: m.original_data_location,
96            extension: m.extension,
97            mime_type: m.mime_type,
98            content_hash: m.content_hash,
99            owner_id: uuid_hex::from_hex(&m.owner_id).expect("DB stores only valid UUID hex strings; corruption indicates data integrity failure"),
100            created_at: m.created_at,
101            updated_at: m.updated_at,
102            label: m.label,
103            original_extension: m.original_extension,
104            original_mime_type: m.original_mime_type,
105            loader_engine: m.loader_engine,
106            raw_content_hash: m.raw_content_hash,
107            tenant_id: uuid_hex::from_hex_opt(m.tenant_id.as_deref()).expect("DB stores only valid UUID hex strings; corruption indicates data integrity failure"),
108            external_metadata: m.external_metadata,
109            node_set: m.node_set,
110            pipeline_status: m.pipeline_status,
111            token_count: m.token_count,
112            data_size: m.data_size,
113            last_accessed: m.last_accessed,
114            importance_weight: m.importance_weight,
115        }
116    }
117}
118
119impl From<&Data> for data::ActiveModel {
120    fn from(d: &Data) -> Self {
121        Self {
122            id: Set(uuid_hex::to_hex(d.id)),
123            name: Set(d.name.clone()),
124            raw_data_location: Set(d.raw_data_location.clone()),
125            original_data_location: Set(d.original_data_location.clone()),
126            extension: Set(d.extension.clone()),
127            mime_type: Set(d.mime_type.clone()),
128            content_hash: Set(d.content_hash.clone()),
129            owner_id: Set(uuid_hex::to_hex(d.owner_id)),
130            created_at: Set(d.created_at),
131            updated_at: Set(d.updated_at),
132            label: Set(d.label.clone()),
133            original_extension: Set(d.original_extension.clone()),
134            original_mime_type: Set(d.original_mime_type.clone()),
135            loader_engine: Set(d.loader_engine.clone()),
136            raw_content_hash: Set(d.raw_content_hash.clone()),
137            tenant_id: Set(uuid_hex::to_hex_opt(d.tenant_id)),
138            external_metadata: Set(d.external_metadata.clone()),
139            node_set: Set(d.node_set.clone()),
140            pipeline_status: Set(d.pipeline_status.clone()),
141            token_count: Set(d.token_count),
142            data_size: Set(d.data_size),
143            last_accessed: Set(d.last_accessed),
144            importance_weight: Set(d.importance_weight),
145        }
146    }
147}
148
149// ---------------------------------------------------------------------------
150// DatasetData conversions
151// ---------------------------------------------------------------------------
152
153pub(crate) fn make_dataset_data_active(
154    dataset_id: uuid::Uuid,
155    data_id: uuid::Uuid,
156) -> dataset_data::ActiveModel {
157    dataset_data::ActiveModel {
158        dataset_id: Set(uuid_hex::to_hex(dataset_id)),
159        data_id: Set(uuid_hex::to_hex(data_id)),
160        created_at: Set(Utc::now()),
161    }
162}
163
164// ---------------------------------------------------------------------------
165// Search history conversions
166// ---------------------------------------------------------------------------
167
168pub(crate) fn query_model_to_history(m: query::Model) -> SearchHistoryEntry {
169    let id = uuid_hex::from_hex(&m.id).expect(
170        "DB stores only valid UUID hex strings; corruption indicates data integrity failure",
171    );
172    SearchHistoryEntry {
173        entry_id: id,
174        query_id: id,
175        entry_type: SearchHistoryEntryType::Query,
176        content: m.query_text,
177        query_type: Some(m.query_type),
178        user_id: uuid_hex::from_hex_opt(m.user_id.as_deref()).expect(
179            "DB stores only valid UUID hex strings; corruption indicates data integrity failure",
180        ),
181        created_at: m.created_at,
182    }
183}
184
185pub(crate) fn result_model_to_history(m: result_log::Model) -> SearchHistoryEntry {
186    SearchHistoryEntry {
187        entry_id: uuid_hex::from_hex(&m.id).expect(
188            "DB stores only valid UUID hex strings; corruption indicates data integrity failure",
189        ),
190        query_id: uuid_hex::from_hex(&m.query_id).expect(
191            "DB stores only valid UUID hex strings; corruption indicates data integrity failure",
192        ),
193        entry_type: SearchHistoryEntryType::Result,
194        content: m.serialized_result,
195        query_type: None,
196        user_id: uuid_hex::from_hex_opt(m.user_id.as_deref()).expect(
197            "DB stores only valid UUID hex strings; corruption indicates data integrity failure",
198        ),
199        created_at: m.created_at,
200    }
201}
202
203// ---------------------------------------------------------------------------
204// Graph node/edge conversions
205// ---------------------------------------------------------------------------
206
207impl From<node::Model> for GraphNode {
208    fn from(m: node::Model) -> Self {
209        Self {
210            id: uuid_hex::from_hex(&m.id).expect("DB stores only valid UUID hex strings; corruption indicates data integrity failure"),
211            slug: uuid_hex::from_hex(&m.slug).expect("DB stores only valid UUID hex strings; corruption indicates data integrity failure"),
212            user_id: uuid_hex::from_hex(&m.user_id).expect("DB stores only valid UUID hex strings; corruption indicates data integrity failure"),
213            data_id: uuid_hex::from_hex(&m.data_id).expect("DB stores only valid UUID hex strings; corruption indicates data integrity failure"),
214            dataset_id: uuid_hex::from_hex(&m.dataset_id).expect("DB stores only valid UUID hex strings; corruption indicates data integrity failure"),
215            label: m.label,
216            node_type: m.node_type,
217            indexed_fields: m.indexed_fields,
218            attributes: m.attributes,
219            created_at: m.created_at,
220        }
221    }
222}
223
224impl From<&GraphNode> for node::ActiveModel {
225    fn from(n: &GraphNode) -> Self {
226        Self {
227            id: Set(uuid_hex::to_hex(n.id)),
228            slug: Set(uuid_hex::to_hex(n.slug)),
229            user_id: Set(uuid_hex::to_hex(n.user_id)),
230            data_id: Set(uuid_hex::to_hex(n.data_id)),
231            dataset_id: Set(uuid_hex::to_hex(n.dataset_id)),
232            label: Set(n.label.clone()),
233            node_type: Set(n.node_type.clone()),
234            indexed_fields: Set(n.indexed_fields.clone()),
235            attributes: Set(n.attributes.clone()),
236            created_at: Set(n.created_at),
237        }
238    }
239}
240
241impl From<edge::Model> for GraphEdge {
242    fn from(m: edge::Model) -> Self {
243        Self {
244            id: uuid_hex::from_hex(&m.id).expect("DB stores only valid UUID hex strings; corruption indicates data integrity failure"),
245            slug: uuid_hex::from_hex(&m.slug).expect("DB stores only valid UUID hex strings; corruption indicates data integrity failure"),
246            user_id: uuid_hex::from_hex(&m.user_id).expect("DB stores only valid UUID hex strings; corruption indicates data integrity failure"),
247            data_id: uuid_hex::from_hex(&m.data_id).expect("DB stores only valid UUID hex strings; corruption indicates data integrity failure"),
248            dataset_id: uuid_hex::from_hex(&m.dataset_id).expect("DB stores only valid UUID hex strings; corruption indicates data integrity failure"),
249            source_node_id: uuid_hex::from_hex(&m.source_node_id).expect("DB stores only valid UUID hex strings; corruption indicates data integrity failure"),
250            destination_node_id: uuid_hex::from_hex(&m.destination_node_id).expect("DB stores only valid UUID hex strings; corruption indicates data integrity failure"),
251            relationship_name: m.relationship_name,
252            label: m.label,
253            attributes: m.attributes,
254            created_at: m.created_at,
255        }
256    }
257}
258
259impl From<&GraphEdge> for edge::ActiveModel {
260    fn from(e: &GraphEdge) -> Self {
261        Self {
262            id: Set(uuid_hex::to_hex(e.id)),
263            slug: Set(uuid_hex::to_hex(e.slug)),
264            user_id: Set(uuid_hex::to_hex(e.user_id)),
265            data_id: Set(uuid_hex::to_hex(e.data_id)),
266            dataset_id: Set(uuid_hex::to_hex(e.dataset_id)),
267            source_node_id: Set(uuid_hex::to_hex(e.source_node_id)),
268            destination_node_id: Set(uuid_hex::to_hex(e.destination_node_id)),
269            relationship_name: Set(e.relationship_name.clone()),
270            label: Set(e.label.clone()),
271            attributes: Set(e.attributes.clone()),
272            created_at: Set(e.created_at),
273        }
274    }
275}
276
277// ---------------------------------------------------------------------------
278// Pipeline run conversions
279// ---------------------------------------------------------------------------
280
281pub(crate) fn entity_status_to_domain(s: pipeline_run::PipelineRunStatus) -> PipelineRunStatus {
282    match s {
283        pipeline_run::PipelineRunStatus::Initiated => PipelineRunStatus::Initiated,
284        pipeline_run::PipelineRunStatus::Started => PipelineRunStatus::Started,
285        pipeline_run::PipelineRunStatus::Completed => PipelineRunStatus::Completed,
286        pipeline_run::PipelineRunStatus::Errored => PipelineRunStatus::Errored,
287    }
288}
289
290pub(crate) fn domain_status_to_entity(s: PipelineRunStatus) -> pipeline_run::PipelineRunStatus {
291    match s {
292        PipelineRunStatus::Initiated => pipeline_run::PipelineRunStatus::Initiated,
293        PipelineRunStatus::Started => pipeline_run::PipelineRunStatus::Started,
294        PipelineRunStatus::Completed => pipeline_run::PipelineRunStatus::Completed,
295        PipelineRunStatus::Errored => pipeline_run::PipelineRunStatus::Errored,
296    }
297}
298
299impl From<pipeline_run::Model> for PipelineRun {
300    fn from(m: pipeline_run::Model) -> Self {
301        Self {
302            id: uuid_hex::from_hex(&m.id).expect("DB stores only valid UUID hex strings; corruption indicates data integrity failure"),
303            created_at: m.created_at,
304            status: entity_status_to_domain(m.status),
305            pipeline_run_id: uuid_hex::from_hex(&m.pipeline_run_id).expect("DB stores only valid UUID hex strings; corruption indicates data integrity failure"),
306            pipeline_name: m.pipeline_name,
307            pipeline_id: uuid_hex::from_hex(&m.pipeline_id).expect("DB stores only valid UUID hex strings; corruption indicates data integrity failure"),
308            dataset_id: uuid_hex::from_hex_opt(m.dataset_id.as_deref()).expect("DB stores only valid UUID hex strings; corruption indicates data integrity failure"),
309            run_info: m.run_info,
310        }
311    }
312}
313
314impl From<&PipelineRun> for pipeline_run::ActiveModel {
315    fn from(r: &PipelineRun) -> Self {
316        Self {
317            id: Set(uuid_hex::to_hex(r.id)),
318            created_at: Set(r.created_at),
319            status: Set(domain_status_to_entity(r.status.clone())),
320            pipeline_run_id: Set(uuid_hex::to_hex(r.pipeline_run_id)),
321            pipeline_name: Set(r.pipeline_name.clone()),
322            pipeline_id: Set(uuid_hex::to_hex(r.pipeline_id)),
323            dataset_id: Set(uuid_hex::to_hex_opt(r.dataset_id)),
324            run_info: Set(r.run_info.clone()),
325        }
326    }
327}
328
329// ---------------------------------------------------------------------------
330// Task run conversions
331// ---------------------------------------------------------------------------
332
333impl From<task_run::Model> for TaskRun {
334    fn from(m: task_run::Model) -> Self {
335        Self {
336            id: uuid_hex::from_hex(&m.id).expect("DB stores only valid UUID hex strings; corruption indicates data integrity failure"),
337            task_name: m.task_name,
338            created_at: m.created_at,
339            status: m.status,
340            run_info: m.run_info,
341        }
342    }
343}
344
345impl From<&TaskRun> for task_run::ActiveModel {
346    fn from(r: &TaskRun) -> Self {
347        Self {
348            id: Set(uuid_hex::to_hex(r.id)),
349            task_name: Set(r.task_name.clone()),
350            created_at: Set(r.created_at),
351            status: Set(r.status.clone()),
352            run_info: Set(r.run_info.clone()),
353        }
354    }
355}
356
357// ---------------------------------------------------------------------------
358// Graph metrics conversions
359// ---------------------------------------------------------------------------
360
361impl From<graph_metrics::Model> for GraphMetrics {
362    fn from(m: graph_metrics::Model) -> Self {
363        Self {
364            id: uuid_hex::from_hex(&m.id).expect("DB stores only valid UUID hex strings; corruption indicates data integrity failure"),
365            num_tokens: m.num_tokens,
366            num_nodes: m.num_nodes,
367            num_edges: m.num_edges,
368            mean_degree: m.mean_degree,
369            edge_density: m.edge_density,
370            num_connected_components: m.num_connected_components,
371            sizes_of_connected_components: m.sizes_of_connected_components,
372            num_selfloops: m.num_selfloops,
373            diameter: m.diameter,
374            avg_shortest_path_length: m.avg_shortest_path_length,
375            avg_clustering: m.avg_clustering,
376            created_at: m.created_at,
377            updated_at: m.updated_at,
378        }
379    }
380}
381
382impl From<&GraphMetrics> for graph_metrics::ActiveModel {
383    fn from(gm: &GraphMetrics) -> Self {
384        Self {
385            id: Set(uuid_hex::to_hex(gm.id)),
386            num_tokens: Set(gm.num_tokens),
387            num_nodes: Set(gm.num_nodes),
388            num_edges: Set(gm.num_edges),
389            mean_degree: Set(gm.mean_degree),
390            edge_density: Set(gm.edge_density),
391            num_connected_components: Set(gm.num_connected_components),
392            sizes_of_connected_components: Set(gm.sizes_of_connected_components.clone()),
393            num_selfloops: Set(gm.num_selfloops),
394            diameter: Set(gm.diameter),
395            avg_shortest_path_length: Set(gm.avg_shortest_path_length),
396            avg_clustering: Set(gm.avg_clustering),
397            created_at: Set(gm.created_at),
398            updated_at: Set(gm.updated_at),
399        }
400    }
401}