1#![allow(
4 clippy::expect_used,
5 reason = "UUID hex round-trips from DB are guaranteed by schema; failure indicates corruption"
6)]
7
8use crate::entities::{
9 data, dataset, dataset_data, edge, graph_metrics, node, pipeline_run, query, result_log,
10 task_run,
11};
12use crate::types::{
13 DatabaseError, GraphEdge, GraphMetrics, GraphNode, PipelineRun, PipelineRunStatus,
14 SearchHistoryEntry, SearchHistoryEntryType, TaskRun,
15};
16use crate::uuid_hex;
17use chrono::Utc;
19use cognee_models::{Data, Dataset};
20use sea_orm::ActiveValue::Set;
21
22pub(crate) fn map_sea_err(e: sea_orm::DbErr) -> DatabaseError {
27 match &e {
28 sea_orm::DbErr::RecordNotFound(_) => DatabaseError::NotFound(e.to_string()),
29 #[cfg(any(feature = "sqlite", feature = "postgres"))]
30 sea_orm::DbErr::Exec(sea_orm::RuntimeErr::SqlxError(sqlx_err)) => {
31 let s = sqlx_err.to_string();
32 if s.contains("UNIQUE constraint failed") || s.contains("unique constraint") {
33 DatabaseError::UniqueViolation(s)
34 } else {
35 DatabaseError::QueryError(s)
36 }
37 }
38 _ => DatabaseError::QueryError(e.to_string()),
39 }
40}
41
42pub(crate) fn ignore_do_nothing(result: Result<(), DatabaseError>) -> Result<(), DatabaseError> {
45 match result {
46 Err(DatabaseError::QueryError(ref msg))
47 if msg.contains("None of the records are inserted") =>
48 {
49 Ok(())
50 }
51 other => other,
52 }
53}
54
55impl From<dataset::Model> for Dataset {
60 fn from(m: dataset::Model) -> Self {
61 Self {
62 id: uuid_hex::from_hex(&m.id).expect("DB stores only valid UUID hex strings; corruption indicates data integrity failure"),
63 name: m.name,
64 owner_id: uuid_hex::from_hex(&m.owner_id).expect("DB stores only valid UUID hex strings; corruption indicates data integrity failure"),
65 tenant_id: uuid_hex::from_hex_opt(m.tenant_id.as_deref()).expect("DB stores only valid UUID hex strings; corruption indicates data integrity failure"),
66 created_at: m.created_at,
67 updated_at: m.updated_at,
68 }
69 }
70}
71
72impl From<&Dataset> for dataset::ActiveModel {
73 fn from(d: &Dataset) -> Self {
74 Self {
75 id: Set(uuid_hex::to_hex(d.id)),
76 name: Set(d.name.clone()),
77 owner_id: Set(uuid_hex::to_hex(d.owner_id)),
78 tenant_id: Set(uuid_hex::to_hex_opt(d.tenant_id)),
79 created_at: Set(d.created_at),
80 updated_at: Set(d.updated_at),
81 }
82 }
83}
84
85impl From<data::Model> for Data {
90 fn from(m: data::Model) -> Self {
91 Self {
92 id: uuid_hex::from_hex(&m.id).expect("DB stores only valid UUID hex strings; corruption indicates data integrity failure"),
93 name: m.name,
94 raw_data_location: m.raw_data_location,
95 original_data_location: m.original_data_location,
96 extension: m.extension,
97 mime_type: m.mime_type,
98 content_hash: m.content_hash,
99 owner_id: uuid_hex::from_hex(&m.owner_id).expect("DB stores only valid UUID hex strings; corruption indicates data integrity failure"),
100 created_at: m.created_at,
101 updated_at: m.updated_at,
102 label: m.label,
103 original_extension: m.original_extension,
104 original_mime_type: m.original_mime_type,
105 loader_engine: m.loader_engine,
106 raw_content_hash: m.raw_content_hash,
107 tenant_id: uuid_hex::from_hex_opt(m.tenant_id.as_deref()).expect("DB stores only valid UUID hex strings; corruption indicates data integrity failure"),
108 external_metadata: m.external_metadata,
109 node_set: m.node_set,
110 pipeline_status: m.pipeline_status,
111 token_count: m.token_count,
112 data_size: m.data_size,
113 last_accessed: m.last_accessed,
114 importance_weight: m.importance_weight,
115 }
116 }
117}
118
119impl From<&Data> for data::ActiveModel {
120 fn from(d: &Data) -> Self {
121 Self {
122 id: Set(uuid_hex::to_hex(d.id)),
123 name: Set(d.name.clone()),
124 raw_data_location: Set(d.raw_data_location.clone()),
125 original_data_location: Set(d.original_data_location.clone()),
126 extension: Set(d.extension.clone()),
127 mime_type: Set(d.mime_type.clone()),
128 content_hash: Set(d.content_hash.clone()),
129 owner_id: Set(uuid_hex::to_hex(d.owner_id)),
130 created_at: Set(d.created_at),
131 updated_at: Set(d.updated_at),
132 label: Set(d.label.clone()),
133 original_extension: Set(d.original_extension.clone()),
134 original_mime_type: Set(d.original_mime_type.clone()),
135 loader_engine: Set(d.loader_engine.clone()),
136 raw_content_hash: Set(d.raw_content_hash.clone()),
137 tenant_id: Set(uuid_hex::to_hex_opt(d.tenant_id)),
138 external_metadata: Set(d.external_metadata.clone()),
139 node_set: Set(d.node_set.clone()),
140 pipeline_status: Set(d.pipeline_status.clone()),
141 token_count: Set(d.token_count),
142 data_size: Set(d.data_size),
143 last_accessed: Set(d.last_accessed),
144 importance_weight: Set(d.importance_weight),
145 }
146 }
147}
148
149pub(crate) fn make_dataset_data_active(
154 dataset_id: uuid::Uuid,
155 data_id: uuid::Uuid,
156) -> dataset_data::ActiveModel {
157 dataset_data::ActiveModel {
158 dataset_id: Set(uuid_hex::to_hex(dataset_id)),
159 data_id: Set(uuid_hex::to_hex(data_id)),
160 created_at: Set(Utc::now()),
161 }
162}
163
164pub(crate) fn query_model_to_history(m: query::Model) -> SearchHistoryEntry {
169 let id = uuid_hex::from_hex(&m.id).expect(
170 "DB stores only valid UUID hex strings; corruption indicates data integrity failure",
171 );
172 SearchHistoryEntry {
173 entry_id: id,
174 query_id: id,
175 entry_type: SearchHistoryEntryType::Query,
176 content: m.query_text,
177 query_type: Some(m.query_type),
178 user_id: uuid_hex::from_hex_opt(m.user_id.as_deref()).expect(
179 "DB stores only valid UUID hex strings; corruption indicates data integrity failure",
180 ),
181 created_at: m.created_at,
182 }
183}
184
185pub(crate) fn result_model_to_history(m: result_log::Model) -> SearchHistoryEntry {
186 SearchHistoryEntry {
187 entry_id: uuid_hex::from_hex(&m.id).expect(
188 "DB stores only valid UUID hex strings; corruption indicates data integrity failure",
189 ),
190 query_id: uuid_hex::from_hex(&m.query_id).expect(
191 "DB stores only valid UUID hex strings; corruption indicates data integrity failure",
192 ),
193 entry_type: SearchHistoryEntryType::Result,
194 content: m.serialized_result,
195 query_type: None,
196 user_id: uuid_hex::from_hex_opt(m.user_id.as_deref()).expect(
197 "DB stores only valid UUID hex strings; corruption indicates data integrity failure",
198 ),
199 created_at: m.created_at,
200 }
201}
202
203impl From<node::Model> for GraphNode {
208 fn from(m: node::Model) -> Self {
209 Self {
210 id: uuid_hex::from_hex(&m.id).expect("DB stores only valid UUID hex strings; corruption indicates data integrity failure"),
211 slug: uuid_hex::from_hex(&m.slug).expect("DB stores only valid UUID hex strings; corruption indicates data integrity failure"),
212 user_id: uuid_hex::from_hex(&m.user_id).expect("DB stores only valid UUID hex strings; corruption indicates data integrity failure"),
213 data_id: uuid_hex::from_hex(&m.data_id).expect("DB stores only valid UUID hex strings; corruption indicates data integrity failure"),
214 dataset_id: uuid_hex::from_hex(&m.dataset_id).expect("DB stores only valid UUID hex strings; corruption indicates data integrity failure"),
215 label: m.label,
216 node_type: m.node_type,
217 indexed_fields: m.indexed_fields,
218 attributes: m.attributes,
219 created_at: m.created_at,
220 }
221 }
222}
223
224impl From<&GraphNode> for node::ActiveModel {
225 fn from(n: &GraphNode) -> Self {
226 Self {
227 id: Set(uuid_hex::to_hex(n.id)),
228 slug: Set(uuid_hex::to_hex(n.slug)),
229 user_id: Set(uuid_hex::to_hex(n.user_id)),
230 data_id: Set(uuid_hex::to_hex(n.data_id)),
231 dataset_id: Set(uuid_hex::to_hex(n.dataset_id)),
232 label: Set(n.label.clone()),
233 node_type: Set(n.node_type.clone()),
234 indexed_fields: Set(n.indexed_fields.clone()),
235 attributes: Set(n.attributes.clone()),
236 created_at: Set(n.created_at),
237 }
238 }
239}
240
241impl From<edge::Model> for GraphEdge {
242 fn from(m: edge::Model) -> Self {
243 Self {
244 id: uuid_hex::from_hex(&m.id).expect("DB stores only valid UUID hex strings; corruption indicates data integrity failure"),
245 slug: uuid_hex::from_hex(&m.slug).expect("DB stores only valid UUID hex strings; corruption indicates data integrity failure"),
246 user_id: uuid_hex::from_hex(&m.user_id).expect("DB stores only valid UUID hex strings; corruption indicates data integrity failure"),
247 data_id: uuid_hex::from_hex(&m.data_id).expect("DB stores only valid UUID hex strings; corruption indicates data integrity failure"),
248 dataset_id: uuid_hex::from_hex(&m.dataset_id).expect("DB stores only valid UUID hex strings; corruption indicates data integrity failure"),
249 source_node_id: uuid_hex::from_hex(&m.source_node_id).expect("DB stores only valid UUID hex strings; corruption indicates data integrity failure"),
250 destination_node_id: uuid_hex::from_hex(&m.destination_node_id).expect("DB stores only valid UUID hex strings; corruption indicates data integrity failure"),
251 relationship_name: m.relationship_name,
252 label: m.label,
253 attributes: m.attributes,
254 created_at: m.created_at,
255 }
256 }
257}
258
259impl From<&GraphEdge> for edge::ActiveModel {
260 fn from(e: &GraphEdge) -> Self {
261 Self {
262 id: Set(uuid_hex::to_hex(e.id)),
263 slug: Set(uuid_hex::to_hex(e.slug)),
264 user_id: Set(uuid_hex::to_hex(e.user_id)),
265 data_id: Set(uuid_hex::to_hex(e.data_id)),
266 dataset_id: Set(uuid_hex::to_hex(e.dataset_id)),
267 source_node_id: Set(uuid_hex::to_hex(e.source_node_id)),
268 destination_node_id: Set(uuid_hex::to_hex(e.destination_node_id)),
269 relationship_name: Set(e.relationship_name.clone()),
270 label: Set(e.label.clone()),
271 attributes: Set(e.attributes.clone()),
272 created_at: Set(e.created_at),
273 }
274 }
275}
276
277pub(crate) fn entity_status_to_domain(s: pipeline_run::PipelineRunStatus) -> PipelineRunStatus {
282 match s {
283 pipeline_run::PipelineRunStatus::Initiated => PipelineRunStatus::Initiated,
284 pipeline_run::PipelineRunStatus::Started => PipelineRunStatus::Started,
285 pipeline_run::PipelineRunStatus::Completed => PipelineRunStatus::Completed,
286 pipeline_run::PipelineRunStatus::Errored => PipelineRunStatus::Errored,
287 }
288}
289
290pub(crate) fn domain_status_to_entity(s: PipelineRunStatus) -> pipeline_run::PipelineRunStatus {
291 match s {
292 PipelineRunStatus::Initiated => pipeline_run::PipelineRunStatus::Initiated,
293 PipelineRunStatus::Started => pipeline_run::PipelineRunStatus::Started,
294 PipelineRunStatus::Completed => pipeline_run::PipelineRunStatus::Completed,
295 PipelineRunStatus::Errored => pipeline_run::PipelineRunStatus::Errored,
296 }
297}
298
299impl From<pipeline_run::Model> for PipelineRun {
300 fn from(m: pipeline_run::Model) -> Self {
301 Self {
302 id: uuid_hex::from_hex(&m.id).expect("DB stores only valid UUID hex strings; corruption indicates data integrity failure"),
303 created_at: m.created_at,
304 status: entity_status_to_domain(m.status),
305 pipeline_run_id: uuid_hex::from_hex(&m.pipeline_run_id).expect("DB stores only valid UUID hex strings; corruption indicates data integrity failure"),
306 pipeline_name: m.pipeline_name,
307 pipeline_id: uuid_hex::from_hex(&m.pipeline_id).expect("DB stores only valid UUID hex strings; corruption indicates data integrity failure"),
308 dataset_id: uuid_hex::from_hex_opt(m.dataset_id.as_deref()).expect("DB stores only valid UUID hex strings; corruption indicates data integrity failure"),
309 run_info: m.run_info,
310 }
311 }
312}
313
314impl From<&PipelineRun> for pipeline_run::ActiveModel {
315 fn from(r: &PipelineRun) -> Self {
316 Self {
317 id: Set(uuid_hex::to_hex(r.id)),
318 created_at: Set(r.created_at),
319 status: Set(domain_status_to_entity(r.status.clone())),
320 pipeline_run_id: Set(uuid_hex::to_hex(r.pipeline_run_id)),
321 pipeline_name: Set(r.pipeline_name.clone()),
322 pipeline_id: Set(uuid_hex::to_hex(r.pipeline_id)),
323 dataset_id: Set(uuid_hex::to_hex_opt(r.dataset_id)),
324 run_info: Set(r.run_info.clone()),
325 }
326 }
327}
328
329impl From<task_run::Model> for TaskRun {
334 fn from(m: task_run::Model) -> Self {
335 Self {
336 id: uuid_hex::from_hex(&m.id).expect("DB stores only valid UUID hex strings; corruption indicates data integrity failure"),
337 task_name: m.task_name,
338 created_at: m.created_at,
339 status: m.status,
340 run_info: m.run_info,
341 }
342 }
343}
344
345impl From<&TaskRun> for task_run::ActiveModel {
346 fn from(r: &TaskRun) -> Self {
347 Self {
348 id: Set(uuid_hex::to_hex(r.id)),
349 task_name: Set(r.task_name.clone()),
350 created_at: Set(r.created_at),
351 status: Set(r.status.clone()),
352 run_info: Set(r.run_info.clone()),
353 }
354 }
355}
356
357impl From<graph_metrics::Model> for GraphMetrics {
362 fn from(m: graph_metrics::Model) -> Self {
363 Self {
364 id: uuid_hex::from_hex(&m.id).expect("DB stores only valid UUID hex strings; corruption indicates data integrity failure"),
365 num_tokens: m.num_tokens,
366 num_nodes: m.num_nodes,
367 num_edges: m.num_edges,
368 mean_degree: m.mean_degree,
369 edge_density: m.edge_density,
370 num_connected_components: m.num_connected_components,
371 sizes_of_connected_components: m.sizes_of_connected_components,
372 num_selfloops: m.num_selfloops,
373 diameter: m.diameter,
374 avg_shortest_path_length: m.avg_shortest_path_length,
375 avg_clustering: m.avg_clustering,
376 created_at: m.created_at,
377 updated_at: m.updated_at,
378 }
379 }
380}
381
382impl From<&GraphMetrics> for graph_metrics::ActiveModel {
383 fn from(gm: &GraphMetrics) -> Self {
384 Self {
385 id: Set(uuid_hex::to_hex(gm.id)),
386 num_tokens: Set(gm.num_tokens),
387 num_nodes: Set(gm.num_nodes),
388 num_edges: Set(gm.num_edges),
389 mean_degree: Set(gm.mean_degree),
390 edge_density: Set(gm.edge_density),
391 num_connected_components: Set(gm.num_connected_components),
392 sizes_of_connected_components: Set(gm.sizes_of_connected_components.clone()),
393 num_selfloops: Set(gm.num_selfloops),
394 diameter: Set(gm.diameter),
395 avg_shortest_path_length: Set(gm.avg_shortest_path_length),
396 avg_clustering: Set(gm.avg_clustering),
397 created_at: Set(gm.created_at),
398 updated_at: Set(gm.updated_at),
399 }
400 }
401}