1use std::collections::HashMap;
12use std::path::Path;
13use std::sync::Arc;
14use tracing::{info, instrument};
15use uuid::Uuid;
16
17use cognee_core::CpuPool;
18#[cfg(test)]
19use cognee_core::RayonThreadPool;
20use cognee_core::pipeline::DataIdFn;
21use cognee_core::pipeline_run_registry::DbPipelineWatcher;
22use cognee_core::task::Value;
23use cognee_core::{Pipeline, PipelineBuilder, PipelineContext, TaskContextBuilder, TypedTask};
24use cognee_database::{AclDb, DatabaseConnection, IngestDb, PipelineRunRepository};
25use cognee_graph::GraphDBTrait;
26use cognee_models::{Data, DataInput, DataPoint, Dataset, Document};
27use cognee_storage::StorageTrait;
28use cognee_vector::VectorDB;
29
30use crate::content_hasher::HashAlgorithm;
31use crate::id_generation::{generate_data_id, generate_dataset_id};
32use crate::loader_registry::get_loader_name;
33use crate::loaders::{LoaderOutput, LoaderRegistry};
34use crate::url_resolver::UrlMetadata;
35#[cfg(feature = "html-loader")]
36use crate::url_resolver::resolve_url_input;
37
38#[derive(Debug, Clone, Default)]
47pub struct AddParams {
48 pub node_set: Option<Vec<String>>,
51
52 pub dataset_id: Option<Uuid>,
54
55 pub preferred_loaders: Option<HashMap<String, String>>,
57
58 pub importance_weight: Option<f64>,
60
61 pub incremental_loading: bool,
66}
67
68#[derive(Debug, Clone)]
77pub struct ProcessedInput {
78 pub content_hash: String,
79 pub raw_content_hash: String,
82 pub data_id: Uuid,
83 pub storage_location: String,
84 pub label: Option<String>,
85 pub stored_extension: String,
86 pub stored_mime_type: String,
87 pub original_extension: String,
88 pub original_mime_type: String,
89 pub loader_engine: String,
90 pub data_size: i64,
91 pub name: String,
92 pub raw_data_uri: String,
93 pub original_location: String,
94 pub raw_source_uri: Option<String>,
95 pub owner_id: Uuid,
96 pub tenant_id: Option<Uuid>,
97 pub external_metadata: Option<String>,
98 pub node_set: Option<String>,
100 pub importance_weight: Option<f64>,
102}
103
104#[instrument(name = "ingestion.process_input", skip(input, storage))]
113pub async fn process_input(
114 input: &DataInput,
115 storage: &dyn StorageTrait,
116 hash_algorithm: HashAlgorithm,
117 owner_id: Uuid,
118 tenant_id: Option<Uuid>,
119) -> Result<ProcessedInput, Box<dyn std::error::Error>> {
120 use tokio::sync::Mutex;
121
122 let (effective_input, resolved_url_metadata, resolved_label, data_item_metadata) =
123 resolve_input_for_processing(input).await?;
124
125 let (
128 file_name,
129 stored_extension,
130 stored_mime_type,
131 original_extension,
132 original_mime_type,
133 label,
134 loader_engine,
135 ) = if let Some(metadata) = resolved_url_metadata.as_ref() {
136 let fname = format!("text_placeholder.{}", metadata.stored_extension);
137 (
138 fname,
139 metadata.stored_extension.clone(),
140 metadata.stored_mime_type.clone(),
141 metadata.source_extension.clone(),
142 metadata.source_mime_type.clone(),
143 resolved_label,
144 metadata.loader_engine.clone(),
145 )
146 } else {
147 let (fname, ext, mime, lbl) = extract_file_metadata(input);
148 let loader = get_loader_name(&ext).to_string();
149 (fname, ext.clone(), mime.clone(), ext, mime, lbl, loader)
150 };
151
152 if matches!(unwrap_data_item(input), DataInput::S3Path(_)) {
156 return Err(Box::new(IngestionError::S3IngestionUnavailable));
157 }
158
159 let raw_source_uri = if let Some(metadata) = resolved_url_metadata.as_ref()
160 && is_html_url_metadata(metadata)
161 {
162 let raw_file_name = format!("source_placeholder.{}", metadata.source_extension);
163 let raw_location = storage.store(&metadata.raw_bytes, &raw_file_name).await?;
164 Some(storage_location_to_uri(storage.base_path(), &raw_location))
165 } else {
166 None
167 };
168
169 let mut stored_extension = stored_extension;
173 let mut stored_mime_type = stored_mime_type;
174 let mut loader_engine = loader_engine;
175
176 let size_counter: Arc<Mutex<i64>> = Arc::new(Mutex::new(0i64));
180 let raw_bytes: Arc<Mutex<Vec<u8>>> = Arc::new(Mutex::new(Vec::new()));
181
182 let size_clone = size_counter.clone();
183 let raw_bytes_clone = raw_bytes.clone();
184
185 effective_input
186 .process_by_chunks(move |chunk| {
187 let size = size_clone.clone();
188 let bytes = raw_bytes_clone.clone();
189 let chunk_owned = chunk.to_vec();
190 async move {
191 *size.lock().await += chunk_owned.len() as i64;
192 bytes.lock().await.extend_from_slice(&chunk_owned);
193 Ok::<(), Box<dyn std::error::Error>>(())
194 }
195 })
196 .await?;
197
198 let collected = Arc::try_unwrap(raw_bytes)
202 .map_err(|_| "Failed to unwrap bytes")?
203 .into_inner();
204 let content_hash =
205 crate::content_hasher::ContentHasher::hash_content(&collected, hash_algorithm);
206 let data_size = Arc::try_unwrap(size_counter)
207 .map_err(|_| "Failed to unwrap size counter")?
208 .into_inner();
209
210 let data_id = generate_data_id(&content_hash, owner_id, tenant_id);
211
212 let (storage_location, raw_content_hash) = if resolved_url_metadata.is_some() {
215 let location = storage.store(&collected, &file_name).await?;
221 (location, content_hash.clone())
222 } else {
223 let registry = LoaderRegistry::default_registry();
227 let doc_type = cognee_models::doc_type_for_extension(&original_extension)
228 .unwrap_or("text")
229 .to_string();
230
231 let is_text_doc_type = doc_type == "text";
246 if is_text_doc_type && std::str::from_utf8(&collected).is_err() {
247 let stored_name = format!("text_{content_hash}.bin");
248 let location = storage.store(&collected, &stored_name).await?;
249 (location, content_hash.clone())
252 } else {
253 let loader = registry.get(&doc_type).ok_or_else(|| {
254 Box::new(IngestionError::UnsupportedDocumentType {
255 document_type: doc_type.clone(),
256 }) as Box<dyn std::error::Error>
257 })?;
258
259 let descriptor = build_loader_descriptor(
262 data_id,
263 &extract_name(input, &content_hash),
264 &original_extension,
265 &original_mime_type,
266 );
267
268 let extracted_text = match loader.extract(&collected, &descriptor).await? {
269 LoaderOutput::Text(t) => t,
270 LoaderOutput::Rows(rows) => rows.join("\n\n"),
271 LoaderOutput::SingleChunk { text, .. } => text,
272 };
273 let extracted_bytes = extracted_text.into_bytes();
274
275 let stored_name = format!("text_{content_hash}.txt");
276 let location = storage.store(&extracted_bytes, &stored_name).await?;
277
278 stored_extension = "txt".to_string();
280 stored_mime_type = "text/plain".to_string();
281 loader_engine = loader.engine_name().to_string();
282
283 let raw_content_hash = crate::content_hasher::ContentHasher::hash_content(
284 &extracted_bytes,
285 hash_algorithm,
286 );
287 (location, raw_content_hash)
288 }
289 };
290
291 let raw_data_uri = storage_location_to_uri(storage.base_path(), &storage_location);
293 let name = extract_name(input, &content_hash);
294 let original_location = if let Some(uri) = raw_source_uri.clone() {
295 uri
296 } else {
297 match input {
298 DataInput::Text(_) => raw_data_uri.clone(),
299 _ => extract_original_location(input),
300 }
301 };
302 let external_metadata =
303 merge_external_metadata(data_item_metadata, resolved_url_metadata.as_ref())?;
304
305 Ok(ProcessedInput {
306 content_hash,
307 raw_content_hash,
308 data_id,
309 storage_location,
310 label,
311 stored_extension,
312 stored_mime_type,
313 original_extension,
314 original_mime_type,
315 loader_engine: loader_engine.to_string(),
316 data_size,
317 name,
318 raw_data_uri,
319 original_location,
320 raw_source_uri,
321 owner_id,
322 tenant_id,
323 external_metadata,
324 node_set: None,
325 importance_weight: None,
326 })
327}
328
329#[instrument(
342 name = "ingestion.persist_data",
343 skip(processed, database),
344 fields(data_id = %processed.data_id)
345)]
346pub async fn persist_data(
347 processed: &ProcessedInput,
348 database: &dyn IngestDb,
349 dataset_name: &str,
350 owner_id: Uuid,
351 tenant_id: Option<Uuid>,
352) -> Result<Data, Box<dyn std::error::Error>> {
353 persist_data_with_acl(
354 processed,
355 database,
356 dataset_name,
357 owner_id,
358 tenant_id,
359 None,
360 None,
361 )
362 .await
363}
364
365#[instrument(
375 name = "ingestion.persist_data_with_acl",
376 skip(processed, database, acl_db),
377 fields(data_id = %processed.data_id)
378)]
379pub async fn persist_data_with_acl(
380 processed: &ProcessedInput,
381 database: &dyn IngestDb,
382 dataset_name: &str,
383 owner_id: Uuid,
384 tenant_id: Option<Uuid>,
385 acl_db: Option<&dyn AclDb>,
386 target_dataset_id: Option<Uuid>,
387) -> Result<Data, Box<dyn std::error::Error>> {
388 let is_new_dataset;
390 let dataset = if let Some(ds_id) = target_dataset_id {
391 match database.get_dataset(ds_id).await? {
392 Some(ds) => {
393 is_new_dataset = false;
394 ds
395 }
396 None => {
397 return Err(format!("Dataset with id {ds_id} not found").into());
398 }
399 }
400 } else {
401 let generated_id = generate_dataset_id(dataset_name, owner_id, tenant_id);
402 match database
403 .get_dataset_by_name(dataset_name, owner_id, tenant_id)
404 .await?
405 {
406 Some(ds) => {
407 is_new_dataset = false;
408 ds
409 }
410 None => {
411 is_new_dataset = true;
412 let new_dataset =
413 Dataset::new(dataset_name.to_string(), owner_id, tenant_id, generated_id);
414 database.create_dataset(new_dataset).await?
415 }
416 }
417 };
418 info!(dataset_id = %dataset.id, "dataset resolved");
419
420 if is_new_dataset && let Some(acl) = acl_db {
422 cognee_database::ops::acl::grant_all_permissions_on_dataset_via_trait(
423 acl, owner_id, dataset.id,
424 )
425 .await?;
426 info!(
427 dataset_id = %dataset.id,
428 owner_id = %owner_id,
429 "ACL permissions granted on new dataset"
430 );
431 }
432
433 let data_id = processed.data_id;
434
435 if let Some(existing_data) = database.get_data(data_id).await? {
436 database.attach_data_to_dataset(dataset.id, data_id).await?;
437 info!(data_id = %data_id, is_duplicate = true, "input processed");
438 return Ok(existing_data);
439 }
440
441 let mut data_builder = Data::builder(
442 data_id,
443 processed.name.clone(),
444 processed.raw_data_uri.clone(),
445 processed.original_location.clone(),
446 processed.stored_extension.clone(),
447 processed.stored_mime_type.clone(),
448 processed.content_hash.clone(),
449 processed.owner_id,
450 )
451 .original_extension(processed.original_extension.clone())
452 .original_mime_type(processed.original_mime_type.clone())
453 .loader_engine(processed.loader_engine.clone())
454 .raw_content_hash(processed.raw_content_hash.clone())
455 .data_size(processed.data_size);
456 if let Some(tid) = processed.tenant_id {
457 data_builder = data_builder.tenant_id(tid);
458 }
459 if let Some(ref lbl) = processed.label {
460 data_builder = data_builder.label(lbl.clone());
461 }
462 if let Some(ref meta) = processed.external_metadata {
463 data_builder = data_builder.external_metadata(meta.clone());
464 }
465 if let Some(ref ns) = processed.node_set {
466 data_builder = data_builder.node_set(ns.clone());
467 }
468 if let Some(w) = processed.importance_weight {
469 data_builder = data_builder.importance_weight(w);
470 }
471 let data = data_builder.build();
472
473 let saved_data = database.create_data(data).await?;
474
475 database.attach_data_to_dataset(dataset.id, data_id).await?;
476
477 info!(data_id = %data_id, is_duplicate = false, "input processed");
478 Ok(saved_data)
479}
480
481fn resolve_mime(extension: &str, path_for_guess: &str) -> String {
492 if get_loader_name(extension) == "text_loader" {
493 "text/plain".to_string()
494 } else {
495 mime_guess::from_path(path_for_guess)
496 .first_or_octet_stream()
497 .to_string()
498 }
499}
500
501fn unwrap_data_item(input: &DataInput) -> &DataInput {
505 match input {
506 DataInput::DataItem { data, .. } => unwrap_data_item(data),
507 other => other,
508 }
509}
510
511fn build_loader_descriptor(
515 data_id: Uuid,
516 name: &str,
517 extension: &str,
518 mime_type: &str,
519) -> Document {
520 let doc_type = cognee_models::doc_type_for_extension(extension)
521 .unwrap_or("text")
522 .to_string();
523 let mut base = DataPoint::new("Document", None);
524 base.id = data_id;
525 Document {
526 base,
527 document_type: doc_type,
528 name: name.to_string(),
529 raw_data_location: String::new(),
530 mime_type: mime_type.to_string(),
531 extension: extension.to_string(),
532 data_id,
533 external_metadata: None,
534 }
535}
536
537fn extract_file_metadata(input: &DataInput) -> (String, String, String, Option<String>) {
539 match input {
540 DataInput::FilePath(path) => {
541 let clean_path = path.strip_prefix("file://").unwrap_or(path);
542 let p = Path::new(clean_path);
543 let file_name = p
544 .file_name()
545 .and_then(|n| n.to_str())
546 .unwrap_or("file.bin")
547 .to_string();
548 let extension = p
549 .extension()
550 .and_then(|e| e.to_str())
551 .unwrap_or("")
552 .to_string();
553 let mime = resolve_mime(&extension, clean_path);
554 (file_name, extension, mime, None)
555 }
556 DataInput::Text(_) => {
557 (
559 "text_placeholder.txt".to_string(),
560 "txt".to_string(),
561 "text/plain".to_string(),
562 None,
563 )
564 }
565 DataInput::Url(_url) => {
566 (
569 "text_placeholder.txt".to_string(),
570 "html".to_string(),
571 "text/html".to_string(),
572 None,
573 )
574 }
575 DataInput::S3Path(_) => (
577 "s3_file.bin".to_string(),
578 "bin".to_string(),
579 "application/octet-stream".to_string(),
580 None,
581 ),
582 DataInput::Binary { name, .. } => {
583 let ext = Path::new(name)
584 .extension()
585 .and_then(|e| e.to_str())
586 .unwrap_or("bin")
587 .to_string();
588 let mime = resolve_mime(&ext, name);
589 (name.clone(), ext, mime, None)
590 }
591 DataInput::DataItem { data, label, .. } => {
592 let (file_name, ext, mime, _) = extract_file_metadata(data);
593 (file_name, ext, mime, Some(label.clone()))
594 }
595 }
596}
597
598fn storage_location_to_uri(base_path: &str, location: &str) -> String {
605 if base_path.is_empty() {
606 location.to_string()
608 } else {
609 let joined = Path::new(base_path).join(location);
610 let abs = if joined.is_absolute() {
613 joined
614 } else {
615 std::env::current_dir().unwrap_or_default().join(&joined)
616 };
617 format!("file://{}", abs.display())
618 }
619}
620
621async fn resolve_input_for_processing(
622 input: &DataInput,
623) -> Result<
624 (
625 DataInput,
626 Option<UrlMetadata>,
627 Option<String>,
628 Option<String>,
629 ),
630 Box<dyn std::error::Error>,
631> {
632 match input {
633 DataInput::Url(url) => {
634 #[cfg(feature = "html-loader")]
635 {
636 let resolved = resolve_url_input(url).await?;
637 Ok((resolved.input, Some(resolved.metadata), None, None))
638 }
639 #[cfg(not(feature = "html-loader"))]
640 {
641 let _ = url;
642 Err(Box::new(IngestionError::UrlIngestionUnavailable))
643 }
644 }
645 DataInput::DataItem {
646 data,
647 label,
648 external_metadata,
649 } => {
650 if let DataInput::Url(_url) = data.as_ref() {
651 #[cfg(feature = "html-loader")]
652 {
653 let resolved = resolve_url_input(_url).await?;
654 Ok((
655 resolved.input,
656 Some(resolved.metadata),
657 Some(label.clone()),
658 external_metadata.clone(),
659 ))
660 }
661 #[cfg(not(feature = "html-loader"))]
662 {
663 Err(Box::new(IngestionError::UrlIngestionUnavailable))
664 }
665 } else {
666 Ok((
667 input.clone(),
668 None,
669 Some(label.clone()),
670 external_metadata.clone(),
671 ))
672 }
673 }
674 _ => Ok((input.clone(), None, None, None)),
675 }
676}
677
678fn is_html_url_metadata(metadata: &UrlMetadata) -> bool {
679 metadata.essence == "text/html" || metadata.essence == "application/xhtml+xml"
680}
681
682fn merge_external_metadata(
683 data_item_metadata: Option<String>,
684 url_metadata: Option<&UrlMetadata>,
685) -> Result<Option<String>, serde_json::Error> {
686 let Some(metadata) = url_metadata else {
687 return Ok(data_item_metadata);
688 };
689
690 let mut merged = serde_json::Map::new();
691 let mut user_metadata_object = None;
692 let mut has_conflict = false;
693
694 if let Some(user_metadata) = data_item_metadata {
695 match serde_json::from_str::<serde_json::Value>(&user_metadata) {
696 Ok(serde_json::Value::Object(user_object)) => {
697 user_metadata_object = Some(serde_json::Value::Object(user_object.clone()));
698 for (key, value) in user_object {
699 merged.insert(key, value);
700 }
701 }
702 _ => {
703 merged.insert(
704 "data_item_external_metadata_raw".to_string(),
705 serde_json::Value::String(user_metadata),
706 );
707 }
708 }
709 }
710
711 let url_fields = [
712 ("source", serde_json::json!("url")),
713 ("url", serde_json::json!(metadata.requested_url.clone())),
714 ("final_url", serde_json::json!(metadata.final_url.clone())),
715 (
716 "content_type",
717 serde_json::json!(metadata.content_type.clone()),
718 ),
719 ];
720 for (key, value) in url_fields {
721 if merged.contains_key(key) {
722 has_conflict = true;
723 }
724 merged.insert(key.to_string(), value);
725 }
726 if let Some(title) = &metadata.title {
727 if merged.contains_key("title") {
728 has_conflict = true;
729 }
730 merged.insert("title".to_string(), serde_json::json!(title));
731 }
732 if has_conflict && let Some(user_metadata) = user_metadata_object {
733 merged.insert("data_item_external_metadata".to_string(), user_metadata);
734 }
735
736 serde_json::to_string(&serde_json::Value::Object(merged)).map(Some)
737}
738
739fn extract_name(input: &DataInput, content_hash: &str) -> String {
741 match input {
742 DataInput::Text(_) => format!("text_{content_hash}"),
743 DataInput::FilePath(path) => {
744 let clean_path = path.strip_prefix("file://").unwrap_or(path);
745 Path::new(clean_path)
746 .file_stem()
747 .and_then(|n| n.to_str())
748 .unwrap_or("unknown")
749 .to_string()
750 }
751 DataInput::Url(_) => format!("text_{content_hash}"),
752 DataInput::S3Path(path) => path
753 .split('/')
754 .next_back()
755 .unwrap_or("s3_content")
756 .to_string(),
757 DataInput::Binary { name, .. } => name.clone(),
758 DataInput::DataItem { data, .. } => extract_name(data, content_hash),
759 }
760}
761
762fn extract_original_location(input: &DataInput) -> String {
763 match input {
764 DataInput::Text(_) => "text://inline".to_string(),
765 DataInput::FilePath(path) => {
766 if path.starts_with("file://") {
767 path.clone()
768 } else {
769 format!("file://{path}")
770 }
771 }
772 DataInput::Url(url) => url.clone(),
773 DataInput::S3Path(path) => path.clone(),
774 DataInput::Binary { name, .. } => format!("binary://{name}"),
775 DataInput::DataItem { data, .. } => extract_original_location(data),
776 }
777}
778
779pub fn make_process_input_task(
786 storage: Arc<dyn StorageTrait>,
787 hash_algorithm: HashAlgorithm,
788 owner_id: Uuid,
789 tenant_id: Option<Uuid>,
790) -> TypedTask<DataInput, ProcessedInput> {
791 TypedTask::async_fn(move |input: &DataInput, _ctx| {
792 let input = input.clone();
793 let storage = Arc::clone(&storage);
794 Box::pin(async move {
795 process_input(&input, &*storage, hash_algorithm, owner_id, tenant_id)
796 .await
797 .map(Box::new)
798 .map_err(|e| format!("{e}").into())
799 })
800 })
801}
802
803pub fn make_persist_data_task(
810 database: Arc<dyn IngestDb>,
811 dataset_name: String,
812 owner_id: Uuid,
813 tenant_id: Option<Uuid>,
814) -> TypedTask<ProcessedInput, Data> {
815 make_persist_data_task_with_acl(database, dataset_name, owner_id, tenant_id, None)
816}
817
818pub fn make_persist_data_task_with_acl(
821 database: Arc<dyn IngestDb>,
822 dataset_name: String,
823 owner_id: Uuid,
824 tenant_id: Option<Uuid>,
825 acl_db: Option<Arc<dyn AclDb>>,
826) -> TypedTask<ProcessedInput, Data> {
827 make_persist_data_task_with_acl_and_params(
828 database,
829 dataset_name,
830 owner_id,
831 tenant_id,
832 acl_db,
833 AddParamsInjection::default(),
834 )
835}
836
837#[derive(Debug, Clone, Default)]
844struct AddParamsInjection {
845 node_set_json: Option<String>,
846 importance_weight: Option<f64>,
847 target_dataset_id: Option<Uuid>,
848}
849
850fn make_persist_data_task_with_acl_and_params(
854 database: Arc<dyn IngestDb>,
855 dataset_name: String,
856 owner_id: Uuid,
857 tenant_id: Option<Uuid>,
858 acl_db: Option<Arc<dyn AclDb>>,
859 add_params: AddParamsInjection,
860) -> TypedTask<ProcessedInput, Data> {
861 TypedTask::async_fn(move |processed: &ProcessedInput, _ctx| {
862 let mut processed = processed.clone();
863 if let Some(ref ns) = add_params.node_set_json {
865 processed.node_set = Some(ns.clone());
866 }
867 if let Some(w) = add_params.importance_weight {
868 processed.importance_weight = Some(w);
869 }
870 let override_ds = add_params.target_dataset_id;
871 let database = Arc::clone(&database);
872 let dataset_name = dataset_name.clone();
873 let acl_db = acl_db.clone();
874 Box::pin(async move {
875 persist_data_with_acl(
876 &processed,
877 &*database,
878 &dataset_name,
879 owner_id,
880 tenant_id,
881 acl_db.as_deref(),
882 override_ds,
883 )
884 .await
885 .map(Box::new)
886 .map_err(|e| format!("{e}").into())
887 })
888 })
889}
890
891pub fn build_add_pipeline(
897 storage: Arc<dyn StorageTrait>,
898 database: Arc<dyn IngestDb>,
899 hash_algorithm: HashAlgorithm,
900 dataset_name: &str,
901 owner_id: Uuid,
902 tenant_id: Option<Uuid>,
903) -> Pipeline {
904 build_add_pipeline_with_acl(
905 storage,
906 database,
907 hash_algorithm,
908 dataset_name,
909 owner_id,
910 tenant_id,
911 None,
912 )
913}
914
915pub fn build_add_pipeline_with_acl(
918 storage: Arc<dyn StorageTrait>,
919 database: Arc<dyn IngestDb>,
920 hash_algorithm: HashAlgorithm,
921 dataset_name: &str,
922 owner_id: Uuid,
923 tenant_id: Option<Uuid>,
924 acl_db: Option<Arc<dyn AclDb>>,
925) -> Pipeline {
926 build_add_pipeline_internal(
927 storage,
928 database,
929 hash_algorithm,
930 dataset_name,
931 owner_id,
932 tenant_id,
933 acl_db,
934 AddParamsInjection::default(),
935 )
936}
937
938#[allow(clippy::too_many_arguments)]
946fn build_add_pipeline_internal(
947 storage: Arc<dyn StorageTrait>,
948 database: Arc<dyn IngestDb>,
949 hash_algorithm: HashAlgorithm,
950 dataset_name: &str,
951 owner_id: Uuid,
952 tenant_id: Option<Uuid>,
953 acl_db: Option<Arc<dyn AclDb>>,
954 add_params: AddParamsInjection,
955) -> Pipeline {
956 let data_id_fn: DataIdFn = Arc::new(|_v: Arc<dyn Value>| None);
962 PipelineBuilder::new_with_task(
963 "ingestion.add",
964 make_process_input_task(Arc::clone(&storage), hash_algorithm, owner_id, tenant_id),
965 )
966 .add_task(make_persist_data_task_with_acl_and_params(
967 database,
968 dataset_name.to_string(),
969 owner_id,
970 tenant_id,
971 acl_db,
972 add_params,
973 ))
974 .with_name("add_pipeline")
977 .with_data_id(data_id_fn)
978 .build()
979}
980
981pub struct AddPipeline {
1003 storage: Arc<dyn StorageTrait>,
1004 database: Arc<dyn IngestDb>,
1005 hash_algorithm: HashAlgorithm,
1006 acl_db: Option<Arc<dyn AclDb>>,
1007 thread_pool: Option<Arc<dyn CpuPool>>,
1009 graph_db: Option<Arc<dyn GraphDBTrait>>,
1010 vector_db: Option<Arc<dyn VectorDB>>,
1011 db_connection: Option<Arc<DatabaseConnection>>,
1012 pipeline_run_repo: Option<Arc<dyn PipelineRunRepository>>,
1014}
1015
1016impl AddPipeline {
1017 pub fn new(storage: Arc<dyn StorageTrait>, database: Arc<dyn IngestDb>) -> Self {
1027 Self {
1028 storage,
1029 database,
1030 hash_algorithm: HashAlgorithm::default(),
1031 acl_db: None,
1032 thread_pool: None,
1033 graph_db: None,
1034 vector_db: None,
1035 db_connection: None,
1036 pipeline_run_repo: None,
1037 }
1038 }
1039
1040 pub fn new_with_algorithm(
1042 storage: Arc<dyn StorageTrait>,
1043 database: Arc<dyn IngestDb>,
1044 hash_algorithm: HashAlgorithm,
1045 ) -> Self {
1046 Self {
1047 storage,
1048 database,
1049 hash_algorithm,
1050 acl_db: None,
1051 thread_pool: None,
1052 graph_db: None,
1053 vector_db: None,
1054 db_connection: None,
1055 pipeline_run_repo: None,
1056 }
1057 }
1058
1059 pub fn with_acl_db(mut self, acl_db: Arc<dyn AclDb>) -> Self {
1065 self.acl_db = Some(acl_db);
1066 self
1067 }
1068
1069 pub fn with_thread_pool(mut self, pool: Arc<dyn CpuPool>) -> Self {
1071 self.thread_pool = Some(pool);
1072 self
1073 }
1074
1075 pub fn with_graph_db(mut self, graph: Arc<dyn GraphDBTrait>) -> Self {
1077 self.graph_db = Some(graph);
1078 self
1079 }
1080
1081 pub fn with_vector_db(mut self, vectors: Arc<dyn VectorDB>) -> Self {
1083 self.vector_db = Some(vectors);
1084 self
1085 }
1086
1087 pub fn with_database(mut self, db: Arc<DatabaseConnection>) -> Self {
1091 self.db_connection = Some(db);
1092 self
1093 }
1094
1095 pub fn with_pipeline_run_repo(mut self, repo: Arc<dyn PipelineRunRepository>) -> Self {
1103 self.pipeline_run_repo = Some(repo);
1104 self
1105 }
1106
1107 #[instrument(
1108 name = "ingestion.add",
1109 skip(self, inputs),
1110 fields(dataset_name, owner_id = %owner_id, inputs_count = inputs.len())
1111 )]
1112 pub async fn add(
1113 &self,
1114 inputs: Vec<DataInput>,
1115 dataset_name: &str,
1116 owner_id: Uuid,
1117 tenant_id: Option<Uuid>,
1118 ) -> Result<Vec<Data>, Box<dyn std::error::Error>> {
1119 self.add_with_params(
1120 inputs,
1121 dataset_name,
1122 owner_id,
1123 tenant_id,
1124 &AddParams::default(),
1125 )
1126 .await
1127 }
1128
1129 #[instrument(
1131 name = "ingestion.add_with_params",
1132 skip(self, inputs, params),
1133 fields(dataset_name, owner_id = %owner_id, inputs_count = inputs.len())
1134 )]
1135 pub async fn add_with_params(
1136 &self,
1137 inputs: Vec<DataInput>,
1138 dataset_name: &str,
1139 owner_id: Uuid,
1140 tenant_id: Option<Uuid>,
1141 params: &AddParams,
1142 ) -> Result<Vec<Data>, Box<dyn std::error::Error>> {
1143 let thread_pool = self
1145 .thread_pool
1146 .clone()
1147 .ok_or(IngestionError::MissingBackend {
1148 which: "thread_pool",
1149 })?;
1150 let graph_db = self
1151 .graph_db
1152 .clone()
1153 .ok_or(IngestionError::MissingBackend { which: "graph_db" })?;
1154 let vector_db = self
1155 .vector_db
1156 .clone()
1157 .ok_or(IngestionError::MissingBackend { which: "vector_db" })?;
1158 let db_connection = self
1159 .db_connection
1160 .clone()
1161 .ok_or(IngestionError::MissingBackend { which: "database" })?;
1162
1163 let node_set_json = params
1165 .node_set
1166 .as_ref()
1167 .map(serde_json::to_string)
1168 .transpose()
1169 .map_err(|e| format!("Failed to serialize node_set: {e}"))?;
1170 let add_params_inj = AddParamsInjection {
1171 node_set_json,
1172 importance_weight: params.importance_weight,
1173 target_dataset_id: params.dataset_id,
1174 };
1175
1176 let pipeline = build_add_pipeline_internal(
1178 Arc::clone(&self.storage),
1179 Arc::clone(&self.database),
1180 self.hash_algorithm,
1181 dataset_name,
1182 owner_id,
1183 tenant_id,
1184 self.acl_db.clone(),
1185 add_params_inj,
1186 );
1187
1188 let pipeline_ctx = PipelineContext {
1195 pipeline_id: pipeline.id,
1196 pipeline_name: pipeline.name.clone().unwrap_or_default(),
1197 user_id: Some(owner_id),
1198 tenant_id,
1199 dataset_id: params.dataset_id,
1200 current_data: None,
1201 run_id: None,
1202 user_email: None,
1203 provenance_visited: Arc::new(std::sync::Mutex::new(std::collections::HashSet::new())),
1204 };
1205
1206 let (_cancel_handle, ctx) = TaskContextBuilder::new()
1207 .thread_pool(thread_pool)
1208 .database(db_connection)
1209 .graph_db(graph_db)
1210 .vector_db(vector_db)
1211 .pipeline_context(pipeline_ctx)
1212 .build()
1213 .map_err(|e| Box::new(e) as Box<dyn std::error::Error>)?;
1214 let ctx = Arc::new(ctx);
1215
1216 let typed_inputs: Vec<Arc<dyn Value>> = inputs
1218 .into_iter()
1219 .map(|i| Arc::new(i) as Arc<dyn Value>)
1220 .collect();
1221
1222 let pipeline_run_repo = self
1226 .pipeline_run_repo
1227 .clone()
1228 .unwrap_or_else(cognee_database::NoopPipelineRunRepository::arc);
1229 let watcher = DbPipelineWatcher::new(pipeline_run_repo);
1230 let outputs = cognee_core::pipeline::execute(&pipeline, typed_inputs, ctx, &watcher)
1231 .await
1232 .map_err(|e| Box::new(e) as Box<dyn std::error::Error>)?;
1233
1234 extract_data_outputs(outputs)
1235 }
1236}
1237
1238fn extract_data_outputs(
1249 outputs: Vec<Arc<dyn Value>>,
1250) -> Result<Vec<Data>, Box<dyn std::error::Error>> {
1251 let mut data_vec = Vec::with_capacity(outputs.len());
1252 for o in outputs {
1253 let d = (*o).as_any().downcast_ref::<Data>().cloned().ok_or(
1259 IngestionError::OutputTypeMismatch {
1260 expected: "Data",
1261 actual: "unknown",
1262 },
1263 )?;
1264 data_vec.push(d);
1265 }
1266 Ok(data_vec)
1267}
1268
1269#[derive(Debug, thiserror::Error)]
1277pub enum IngestionError {
1278 #[error("AddPipeline missing required backend: {which}")]
1281 MissingBackend { which: &'static str },
1282 #[error("AddPipeline output type mismatch: expected {expected}, actual {actual}")]
1285 OutputTypeMismatch {
1286 expected: &'static str,
1287 actual: &'static str,
1288 },
1289 #[error("URL ingestion requires the `html-loader` feature to be enabled")]
1292 UrlIngestionUnavailable,
1293 #[error("S3 ingestion is not yet implemented")]
1296 S3IngestionUnavailable,
1297 #[error("Unsupported document type at ingest: {document_type}")]
1300 UnsupportedDocumentType { document_type: String },
1301}
1302
1303#[cfg(test)]
1308#[allow(
1309 clippy::unwrap_used,
1310 clippy::expect_used,
1311 reason = "test code — panics are acceptable failures"
1312)]
1313mod tests {
1314 use super::*;
1315 use cognee_database::{connect, initialize, ops};
1316 use cognee_graph::MockGraphDB;
1317 #[cfg(feature = "html-loader")]
1318 use cognee_storage::LocalStorage;
1319 use cognee_storage::MockStorage;
1320 use cognee_vector::MockVectorDB;
1321 #[cfg(feature = "html-loader")]
1322 use mockito::{Server, ServerGuard};
1323 use std::io::Write;
1324 use tempfile::NamedTempFile;
1325
1326 async fn make_pipeline() -> (AddPipeline, Arc<cognee_database::DatabaseConnection>) {
1327 let db = connect("sqlite::memory:").await.unwrap();
1328 initialize(&db).await.unwrap();
1329 let db = Arc::new(db);
1330 let storage: Arc<dyn StorageTrait> = Arc::new(MockStorage::new());
1331 let pipeline = AddPipeline::new(storage, db.clone() as Arc<dyn IngestDb>)
1332 .with_thread_pool(Arc::new(RayonThreadPool::with_default_threads().unwrap()))
1333 .with_graph_db(Arc::new(MockGraphDB::new()))
1334 .with_vector_db(Arc::new(MockVectorDB::new()))
1335 .with_database(Arc::clone(&db));
1336 (pipeline, db)
1337 }
1338
1339 #[cfg(feature = "html-loader")]
1340 async fn server_with_robots() -> ServerGuard {
1341 let mut server = Server::new_async().await;
1342 server
1343 .mock("GET", "/robots.txt")
1344 .with_status(404)
1345 .create_async()
1346 .await;
1347 server
1348 }
1349
1350 #[cfg(feature = "html-loader")]
1351 #[tokio::test]
1352 async fn test_process_input_url_html_stores_text_with_source_metadata() {
1353 let mut server = server_with_robots().await;
1354 let html = "<html><head><title>Example</title><style>.x{display:none}</style></head><body><h1>Visible text</h1><script>hidden()</script></body></html>";
1355 let url = format!("{}/page.html", server.url());
1356 let _mock = server
1357 .mock("GET", "/page.html")
1358 .with_header("content-type", "text/html; charset=utf-8")
1359 .with_body(html)
1360 .create_async()
1361 .await;
1362 let temp_dir = tempfile::tempdir().unwrap();
1363 let storage = LocalStorage::new(temp_dir.path().to_path_buf());
1364
1365 let processed = process_input(
1366 &DataInput::Url(url.clone()),
1367 &storage,
1368 HashAlgorithm::Md5,
1369 Uuid::new_v4(),
1370 None,
1371 )
1372 .await
1373 .unwrap();
1374
1375 let raw_source_uri = processed.raw_source_uri.as_ref().unwrap();
1376 let raw_html = storage.retrieve(raw_source_uri).await.unwrap();
1377 assert_eq!(raw_html, html.as_bytes());
1378 assert!(raw_source_uri.ends_with(".html"));
1379 assert!(processed.raw_data_uri.ends_with(".txt"));
1380 assert_ne!(processed.raw_data_uri, *raw_source_uri);
1381
1382 let stored = storage.retrieve(&processed.raw_data_uri).await.unwrap();
1383 let stored_text = String::from_utf8(stored).unwrap();
1384 assert!(stored_text.contains("Visible text"));
1385 assert!(!stored_text.contains("<html>"));
1386 assert!(!stored_text.contains("hidden()"));
1387 assert!(!stored_text.contains("display:none"));
1388 assert_eq!(processed.stored_extension, "txt");
1389 assert_eq!(processed.stored_mime_type, "text/plain");
1390 assert_eq!(processed.original_extension, "html");
1391 assert_eq!(processed.original_mime_type, "text/html");
1392 assert_eq!(processed.loader_engine, "beautiful_soup_loader");
1393 assert_eq!(processed.original_location, *raw_source_uri);
1394
1395 let metadata: serde_json::Value =
1396 serde_json::from_str(processed.external_metadata.as_ref().unwrap()).unwrap();
1397 assert_eq!(metadata["source"], "url");
1398 assert_eq!(metadata["url"], url);
1399 assert_eq!(metadata["final_url"], url);
1400 assert_eq!(metadata["content_type"], "text/html; charset=utf-8");
1401 assert_eq!(metadata["title"], "Example");
1402 }
1403
1404 #[cfg(feature = "html-loader")]
1405 #[tokio::test]
1406 async fn test_persist_data_url_html_uses_text_payload_and_raw_html_original_location() {
1407 let mut server = server_with_robots().await;
1408 let url = format!("{}/page", server.url());
1409 let html = "<html><head><title>XHTML</title></head><body>XHTML body</body></html>";
1410 let _mock = server
1411 .mock("GET", "/page")
1412 .with_header("content-type", "application/xhtml+xml")
1413 .with_body(html)
1414 .create_async()
1415 .await;
1416 let db = connect("sqlite::memory:").await.unwrap();
1417 initialize(&db).await.unwrap();
1418 let temp_dir = tempfile::tempdir().unwrap();
1419 let storage = LocalStorage::new(temp_dir.path().to_path_buf());
1420 let owner_id = Uuid::new_v4();
1421
1422 let processed = process_input(
1423 &DataInput::Url(url),
1424 &storage,
1425 HashAlgorithm::Md5,
1426 owner_id,
1427 None,
1428 )
1429 .await
1430 .unwrap();
1431 let data = persist_data(&processed, &db, "url-html", owner_id, None)
1432 .await
1433 .unwrap();
1434
1435 assert_eq!(data.extension, "txt");
1436 assert_eq!(data.mime_type, "text/plain");
1437 assert!(data.raw_data_location.ends_with(".txt"));
1438 assert!(data.original_data_location.ends_with(".html"));
1439 assert_ne!(data.raw_data_location, data.original_data_location);
1440 assert_eq!(
1441 storage
1442 .retrieve(&data.original_data_location)
1443 .await
1444 .unwrap(),
1445 html.as_bytes()
1446 );
1447 assert_eq!(data.original_extension.as_deref(), Some("html"));
1448 assert_eq!(
1449 data.original_mime_type.as_deref(),
1450 Some("application/xhtml+xml")
1451 );
1452 assert_eq!(data.loader_engine.as_deref(), Some("beautiful_soup_loader"));
1453 }
1454
1455 #[cfg(feature = "html-loader")]
1456 #[tokio::test]
1457 async fn test_data_item_url_merges_metadata_and_preserves_label() {
1458 let mut server = server_with_robots().await;
1459 let url = format!("{}/wrapped", server.url());
1460 let _mock = server
1461 .mock("GET", "/wrapped")
1462 .with_header("content-type", "text/html")
1463 .with_body("<html><head><title>Wrapped</title></head><body>Wrapped body</body></html>")
1464 .create_async()
1465 .await;
1466 let db = connect("sqlite::memory:").await.unwrap();
1467 initialize(&db).await.unwrap();
1468 let temp_dir = tempfile::tempdir().unwrap();
1469 let storage = LocalStorage::new(temp_dir.path().to_path_buf());
1470 let owner_id = Uuid::new_v4();
1471 let processed = process_input(
1472 &DataInput::DataItem {
1473 data: Box::new(DataInput::Url(url.clone())),
1474 label: "wrapped-label".to_string(),
1475 external_metadata: Some(r#"{"custom":"keep","rank":7}"#.to_string()),
1476 },
1477 &storage,
1478 HashAlgorithm::Md5,
1479 owner_id,
1480 None,
1481 )
1482 .await
1483 .unwrap();
1484 let data = persist_data(&processed, &db, "wrapped-url", owner_id, None)
1485 .await
1486 .unwrap();
1487
1488 assert_eq!(data.label.as_deref(), Some("wrapped-label"));
1489 assert_eq!(data.extension, "txt");
1490 assert!(data.original_data_location.ends_with(".html"));
1491 let metadata: serde_json::Value =
1492 serde_json::from_str(data.external_metadata.as_ref().unwrap()).unwrap();
1493 assert_eq!(metadata["custom"], "keep");
1494 assert_eq!(metadata["rank"], 7);
1495 assert_eq!(metadata["source"], "url");
1496 assert_eq!(metadata["url"], url);
1497 assert_eq!(metadata["final_url"], url);
1498 assert_eq!(metadata["content_type"], "text/html");
1499 assert_eq!(metadata["title"], "Wrapped");
1500 }
1501
1502 #[cfg(feature = "html-loader")]
1503 #[tokio::test]
1504 async fn test_data_item_url_invalid_or_non_object_metadata_preserved_under_raw_field() {
1505 let mut server = server_with_robots().await;
1506 let cases = [
1507 ("/invalid-meta", "not-json"),
1508 ("/non-object-meta", r#"["not","an","object"]"#),
1509 ];
1510
1511 for (path, user_metadata) in cases {
1512 let url = format!("{}{}", server.url(), path);
1513 let _mock = server
1514 .mock("GET", path)
1515 .with_header("content-type", "text/html")
1516 .with_body("<html><body>Metadata body</body></html>")
1517 .create_async()
1518 .await;
1519 let temp_dir = tempfile::tempdir().unwrap();
1520 let storage = LocalStorage::new(temp_dir.path().to_path_buf());
1521 let owner_id = Uuid::new_v4();
1522
1523 let processed = process_input(
1524 &DataInput::DataItem {
1525 data: Box::new(DataInput::Url(url.clone())),
1526 label: "invalid-meta".to_string(),
1527 external_metadata: Some(user_metadata.to_string()),
1528 },
1529 &storage,
1530 HashAlgorithm::Md5,
1531 owner_id,
1532 None,
1533 )
1534 .await
1535 .unwrap();
1536
1537 let metadata: serde_json::Value =
1538 serde_json::from_str(processed.external_metadata.as_ref().unwrap()).unwrap();
1539 assert_eq!(metadata["data_item_external_metadata_raw"], user_metadata);
1540 assert_eq!(metadata["source"], "url");
1541 assert_eq!(metadata["url"], url);
1542 }
1543 }
1544
1545 #[cfg(feature = "html-loader")]
1546 #[tokio::test]
1547 async fn test_non_html_url_inputs_do_not_store_raw_source_copy() {
1548 let mut server = server_with_robots().await;
1549 let cases = [
1550 ("/plain", "text/plain", "plain body", "txt", "text/plain"),
1551 (
1552 "/json",
1553 "application/json",
1554 r#"{"hello":"world"}"#,
1555 "json",
1556 "application/json",
1557 ),
1558 ("/csv", "text/csv", "a,b\n1,2\n", "csv", "text/csv"),
1559 (
1560 "/pdf",
1561 "application/pdf",
1562 "%PDF-1.7\n",
1563 "pdf",
1564 "application/pdf",
1565 ),
1566 ];
1567
1568 for (path, content_type, body, expected_ext, expected_mime) in cases {
1569 let url = format!("{}{}", server.url(), path);
1570 let _mock = server
1571 .mock("GET", path)
1572 .with_header("content-type", content_type)
1573 .with_body(body)
1574 .create_async()
1575 .await;
1576 let temp_dir = tempfile::tempdir().unwrap();
1577 let storage = LocalStorage::new(temp_dir.path().to_path_buf());
1578 let processed = process_input(
1579 &DataInput::Url(url.clone()),
1580 &storage,
1581 HashAlgorithm::Md5,
1582 Uuid::new_v4(),
1583 None,
1584 )
1585 .await
1586 .unwrap();
1587
1588 assert_eq!(processed.raw_source_uri, None);
1589 assert_eq!(processed.original_location, url);
1590 assert_eq!(processed.stored_extension, expected_ext);
1591 assert_eq!(processed.stored_mime_type, expected_mime);
1592 assert_eq!(
1593 storage.retrieve(&processed.raw_data_uri).await.unwrap(),
1594 body.as_bytes()
1595 );
1596 }
1597 }
1598
1599 #[tokio::test]
1600 async fn test_add_text_input() {
1601 let (pipeline, db) = make_pipeline().await;
1602 let owner_id = Uuid::new_v4();
1603
1604 let inputs = vec![DataInput::Text("Hello, world!".to_string())];
1605 let result = pipeline.add(inputs, "test_dataset", owner_id, None).await;
1606 assert!(result.is_ok(), "add should succeed: {:?}", result.err());
1607
1608 let data = result.unwrap();
1609 assert_eq!(data.len(), 1);
1610 assert!(
1612 data[0].name.starts_with("text_"),
1613 "name should start with text_"
1614 );
1615 assert_eq!(data[0].mime_type, "text/plain");
1616 assert_eq!(data[0].extension, "txt");
1617
1618 let datasets = ops::datasets::list_datasets_by_owner(&db, owner_id)
1619 .await
1620 .unwrap();
1621 assert_eq!(datasets.len(), 1);
1622 let ds_data = ops::datasets::get_dataset_data(&db, datasets[0].id)
1623 .await
1624 .unwrap();
1625 assert_eq!(ds_data.len(), 1);
1626 }
1627
1628 #[tokio::test]
1629 async fn test_add_file_input() {
1630 let (pipeline, db) = make_pipeline().await;
1631 let owner_id = Uuid::new_v4();
1632
1633 let mut temp_file = NamedTempFile::new().unwrap();
1634 writeln!(temp_file, "Test file content").unwrap();
1635 let file_path = temp_file.path().to_str().unwrap().to_string();
1636
1637 let inputs = vec![DataInput::FilePath(file_path)];
1638 let result = pipeline.add(inputs, "test_dataset", owner_id, None).await;
1639 assert!(result.is_ok());
1640
1641 let data = result.unwrap();
1642 assert_eq!(data.len(), 1);
1643 assert!(!data[0].name.is_empty());
1644
1645 let datasets = ops::datasets::list_datasets_by_owner(&db, owner_id)
1646 .await
1647 .unwrap();
1648 assert_eq!(datasets.len(), 1);
1649 }
1650
1651 #[tokio::test]
1652 async fn test_add_multiple_inputs() {
1653 let (pipeline, db) = make_pipeline().await;
1654 let owner_id = Uuid::new_v4();
1655
1656 let inputs = vec![
1657 DataInput::Text("First text".to_string()),
1658 DataInput::Text("Second text".to_string()),
1659 ];
1660 let result = pipeline.add(inputs, "test_dataset", owner_id, None).await;
1661 assert!(result.is_ok());
1662
1663 let data = result.unwrap();
1664 assert_eq!(data.len(), 2);
1665
1666 let datasets = ops::datasets::list_datasets_by_owner(&db, owner_id)
1667 .await
1668 .unwrap();
1669 assert_eq!(datasets.len(), 1);
1670 let ds_data = ops::datasets::get_dataset_data(&db, datasets[0].id)
1671 .await
1672 .unwrap();
1673 assert_eq!(ds_data.len(), 2);
1674 }
1675
1676 #[tokio::test]
1677 async fn test_deduplication_same_content() {
1678 let (pipeline, db) = make_pipeline().await;
1679 let owner_id = Uuid::new_v4();
1680
1681 let content = "Duplicate content";
1682 let result1 = pipeline
1683 .add(
1684 vec![DataInput::Text(content.to_string())],
1685 "test_dataset",
1686 owner_id,
1687 None,
1688 )
1689 .await
1690 .unwrap();
1691 let result2 = pipeline
1692 .add(
1693 vec![DataInput::Text(content.to_string())],
1694 "test_dataset",
1695 owner_id,
1696 None,
1697 )
1698 .await
1699 .unwrap();
1700
1701 assert_eq!(result1[0].id, result2[0].id);
1702 assert_eq!(result1[0].content_hash, result2[0].content_hash);
1703
1704 let dataset = ops::datasets::get_dataset_by_name(&db, "test_dataset", owner_id, None)
1705 .await
1706 .unwrap()
1707 .unwrap();
1708 let ds_data = ops::datasets::get_dataset_data(&db, dataset.id)
1709 .await
1710 .unwrap();
1711 assert_eq!(ds_data.len(), 1);
1712 }
1713
1714 #[tokio::test]
1715 async fn test_different_owners_same_hash_different_ids() {
1716 let (pipeline, _db) = make_pipeline().await;
1717 let owner1 = Uuid::new_v4();
1718 let owner2 = Uuid::new_v4();
1719
1720 let result1 = pipeline
1721 .add(
1722 vec![DataInput::Text("Same content".to_string())],
1723 "ds1",
1724 owner1,
1725 None,
1726 )
1727 .await
1728 .unwrap();
1729 let result2 = pipeline
1730 .add(
1731 vec![DataInput::Text("Same content".to_string())],
1732 "ds2",
1733 owner2,
1734 None,
1735 )
1736 .await
1737 .unwrap();
1738
1739 assert_eq!(
1741 result1[0].content_hash, result2[0].content_hash,
1742 "content hash is owner-independent"
1743 );
1744 assert_ne!(result1[0].id, result2[0].id, "data_id must differ by owner");
1746 }
1747
1748 #[tokio::test]
1749 async fn test_multiple_datasets() {
1750 let (pipeline, db) = make_pipeline().await;
1751 let owner_id = Uuid::new_v4();
1752
1753 pipeline
1754 .add(
1755 vec![DataInput::Text("Content 1".to_string())],
1756 "dataset1",
1757 owner_id,
1758 None,
1759 )
1760 .await
1761 .unwrap();
1762 pipeline
1763 .add(
1764 vec![DataInput::Text("Content 2".to_string())],
1765 "dataset2",
1766 owner_id,
1767 None,
1768 )
1769 .await
1770 .unwrap();
1771
1772 let datasets = ops::datasets::list_datasets_by_owner(&db, owner_id)
1773 .await
1774 .unwrap();
1775 assert_eq!(datasets.len(), 2);
1776 }
1777
1778 #[tokio::test]
1779 async fn test_reuse_dataset() {
1780 let (pipeline, db) = make_pipeline().await;
1781 let owner_id = Uuid::new_v4();
1782
1783 pipeline
1784 .add(
1785 vec![DataInput::Text("Content 1".to_string())],
1786 "same_dataset",
1787 owner_id,
1788 None,
1789 )
1790 .await
1791 .unwrap();
1792 pipeline
1793 .add(
1794 vec![DataInput::Text("Content 2".to_string())],
1795 "same_dataset",
1796 owner_id,
1797 None,
1798 )
1799 .await
1800 .unwrap();
1801
1802 let datasets = ops::datasets::list_datasets_by_owner(&db, owner_id)
1803 .await
1804 .unwrap();
1805 assert_eq!(datasets.len(), 1);
1806 let ds_data = ops::datasets::get_dataset_data(&db, datasets[0].id)
1807 .await
1808 .unwrap();
1809 assert_eq!(ds_data.len(), 2);
1810 }
1811
1812 #[tokio::test]
1813 async fn test_content_hash_deterministic() {
1814 let (pipeline, _db) = make_pipeline().await;
1815 let owner_id = Uuid::new_v4();
1816
1817 let result1 = pipeline
1818 .add(
1819 vec![DataInput::Text("Test content".to_string())],
1820 "dataset1",
1821 owner_id,
1822 None,
1823 )
1824 .await
1825 .unwrap();
1826 let result2 = pipeline
1827 .add(
1828 vec![DataInput::Text("Test content".to_string())],
1829 "dataset1",
1830 owner_id,
1831 None,
1832 )
1833 .await
1834 .unwrap();
1835
1836 assert_eq!(result1[0].content_hash, result2[0].content_hash);
1837 assert_eq!(result1[0].id, result2[0].id);
1838 }
1839
1840 #[tokio::test]
1850 async fn test_content_hash_non_empty_across_variants_and_db_roundtrip() {
1851 let (pipeline, db) = make_pipeline().await;
1852 let owner_id = Uuid::new_v4();
1853
1854 let text_data = pipeline
1856 .add(
1857 vec![DataInput::Text("Provenance audit text".to_string())],
1858 "audit_text",
1859 owner_id,
1860 None,
1861 )
1862 .await
1863 .unwrap();
1864 assert!(
1865 !text_data[0].content_hash.is_empty(),
1866 "Text input must populate content_hash"
1867 );
1868
1869 let mut temp_file = NamedTempFile::new().unwrap();
1871 writeln!(temp_file, "Provenance audit file").unwrap();
1872 let file_path = temp_file.path().to_str().unwrap().to_string();
1873 let file_data = pipeline
1874 .add(
1875 vec![DataInput::FilePath(file_path)],
1876 "audit_file",
1877 owner_id,
1878 None,
1879 )
1880 .await
1881 .unwrap();
1882 assert!(
1883 !file_data[0].content_hash.is_empty(),
1884 "FilePath input must populate content_hash"
1885 );
1886
1887 let binary_data = pipeline
1889 .add(
1890 vec![DataInput::Binary {
1891 name: "audit.bin".to_string(),
1892 data: b"provenance audit binary".to_vec(),
1893 }],
1894 "audit_binary",
1895 owner_id,
1896 None,
1897 )
1898 .await
1899 .unwrap();
1900 assert!(
1901 !binary_data[0].content_hash.is_empty(),
1902 "Binary input must populate content_hash"
1903 );
1904
1905 let wrapped = pipeline
1907 .add(
1908 vec![DataInput::DataItem {
1909 data: Box::new(DataInput::Text("Wrapped audit text".to_string())),
1910 label: "wrapped".to_string(),
1911 external_metadata: None,
1912 }],
1913 "audit_wrapped",
1914 owner_id,
1915 None,
1916 )
1917 .await
1918 .unwrap();
1919 assert!(
1920 !wrapped[0].content_hash.is_empty(),
1921 "DataItem(Text) must populate content_hash"
1922 );
1923
1924 let reread = ops::data::get_data(&db, text_data[0].id)
1928 .await
1929 .unwrap()
1930 .expect("data row exists immediately after add()");
1931 assert_eq!(
1932 reread.content_hash, text_data[0].content_hash,
1933 "content_hash must round-trip through SeaORM <-> Data conversion"
1934 );
1935 }
1936
1937 #[tokio::test]
1938 async fn test_dataset_id_is_deterministic() {
1939 let (pipeline, db) = make_pipeline().await;
1940 let owner_id = Uuid::new_v4();
1941
1942 pipeline
1943 .add(
1944 vec![DataInput::Text("any content".to_string())],
1945 "my_dataset",
1946 owner_id,
1947 None,
1948 )
1949 .await
1950 .unwrap();
1951 pipeline
1952 .add(
1953 vec![DataInput::Text("other content".to_string())],
1954 "my_dataset",
1955 owner_id,
1956 None,
1957 )
1958 .await
1959 .unwrap();
1960
1961 let datasets = ops::datasets::list_datasets_by_owner(&db, owner_id)
1963 .await
1964 .unwrap();
1965 assert_eq!(
1966 datasets.len(),
1967 1,
1968 "deterministic dataset ID should prevent duplicate creation"
1969 );
1970 }
1971
1972 #[tokio::test]
1973 async fn test_loader_engine_populated() {
1974 let (pipeline, _db) = make_pipeline().await;
1978 let owner_id = Uuid::new_v4();
1979
1980 let mut temp_file = NamedTempFile::new().unwrap();
1981 writeln!(temp_file, "content").unwrap();
1982 let txt_path = temp_file.path().with_extension("txt");
1983 std::fs::copy(temp_file.path(), &txt_path).unwrap();
1984
1985 let result = pipeline
1986 .add(
1987 vec![DataInput::FilePath(txt_path.to_str().unwrap().to_string())],
1988 "ds",
1989 owner_id,
1990 None,
1991 )
1992 .await
1993 .unwrap();
1994
1995 assert_eq!(result[0].loader_engine.as_deref(), Some("text_loader"));
1996 assert_eq!(result[0].extension, "txt");
1997 let _ = std::fs::remove_file(&txt_path);
1998 }
1999
2000 #[cfg(not(any(feature = "pdf-pdfium", feature = "pdf-pure-rust")))]
2004 #[tokio::test]
2005 async fn test_unsupported_loader_type_errors_at_add() {
2006 let (pipeline, _db) = make_pipeline().await;
2007 let owner_id = Uuid::new_v4();
2008
2009 let mut temp_file = NamedTempFile::new().unwrap();
2010 writeln!(temp_file, "%PDF-1.7").unwrap();
2011 let pdf_path = temp_file.path().with_extension("pdf");
2012 std::fs::copy(temp_file.path(), &pdf_path).unwrap();
2013
2014 let result = pipeline
2015 .add(
2016 vec![DataInput::FilePath(pdf_path.to_str().unwrap().to_string())],
2017 "ds",
2018 owner_id,
2019 None,
2020 )
2021 .await;
2022
2023 assert!(
2024 result.is_err(),
2025 "pdf input must error when the pdf loader feature is off"
2026 );
2027 let _ = std::fs::remove_file(&pdf_path);
2028 }
2029
2030 #[tokio::test]
2034 async fn test_text_path_no_regression_hashes_and_stored_bytes() {
2035 use cognee_storage::LocalStorage;
2036
2037 const HELLO_WORLD: &str = "hello world";
2040 const HELLO_WORLD_MD5: &str = "5eb63bbbe01eeed093cb22bb8f5acdc3";
2041
2042 let temp_dir = tempfile::tempdir().unwrap();
2043 let storage = LocalStorage::new(temp_dir.path().to_path_buf());
2044 let owner_id = Uuid::new_v4();
2045
2046 let processed = process_input(
2047 &DataInput::Text(HELLO_WORLD.to_string()),
2048 &storage,
2049 HashAlgorithm::Md5,
2050 owner_id,
2051 None,
2052 )
2053 .await
2054 .unwrap();
2055
2056 assert_eq!(processed.content_hash, HELLO_WORLD_MD5);
2058 assert_eq!(
2059 processed.data_id,
2060 generate_data_id(HELLO_WORLD_MD5, owner_id, None),
2061 "data_id must remain uuid5(content_hash + owner + tenant)"
2062 );
2063
2064 assert_eq!(
2066 processed.raw_content_hash, processed.content_hash,
2067 "plain-text raw_content_hash must equal content_hash"
2068 );
2069 assert_eq!(processed.stored_extension, "txt");
2070 assert_eq!(processed.stored_mime_type, "text/plain");
2071 assert_eq!(processed.loader_engine, "text_loader");
2072
2073 let stored = storage.retrieve(&processed.raw_data_uri).await.unwrap();
2075 assert_eq!(stored, HELLO_WORLD.as_bytes());
2076 }
2077
2078 #[cfg(feature = "csv-loader")]
2083 #[tokio::test]
2084 async fn test_csv_path_stores_extracted_text() {
2085 use cognee_storage::LocalStorage;
2086
2087 let csv = "name,age\nAlice,30\nBob,25\n";
2088 let temp_dir = tempfile::tempdir().unwrap();
2089 let storage = LocalStorage::new(temp_dir.path().to_path_buf());
2090 let owner_id = Uuid::new_v4();
2091
2092 let mut temp_file = NamedTempFile::new().unwrap();
2093 write!(temp_file, "{csv}").unwrap();
2094 let csv_path = temp_file.path().with_extension("csv");
2095 std::fs::copy(temp_file.path(), &csv_path).unwrap();
2096
2097 let processed = process_input(
2098 &DataInput::FilePath(csv_path.to_str().unwrap().to_string()),
2099 &storage,
2100 HashAlgorithm::Md5,
2101 owner_id,
2102 None,
2103 )
2104 .await
2105 .unwrap();
2106
2107 assert_eq!(
2109 processed.content_hash,
2110 crate::content_hasher::ContentHasher::hash_content(csv.as_bytes(), HashAlgorithm::Md5)
2111 );
2112 assert_eq!(processed.stored_extension, "txt");
2114 assert_eq!(processed.stored_mime_type, "text/plain");
2115 assert_eq!(processed.original_extension, "csv");
2116 assert_eq!(processed.loader_engine, "csv_loader");
2117 assert_ne!(
2118 processed.raw_content_hash, processed.content_hash,
2119 "extracted csv text differs from raw bytes"
2120 );
2121
2122 let stored = storage.retrieve(&processed.raw_data_uri).await.unwrap();
2123 let stored_text = String::from_utf8(stored).unwrap();
2124 assert!(stored_text.contains("name: Alice, age: 30"));
2125 assert!(!stored_text.contains("name,age"));
2126
2127 assert_eq!(
2129 processed.raw_content_hash,
2130 crate::content_hasher::ContentHasher::hash_content(
2131 stored_text.as_bytes(),
2132 HashAlgorithm::Md5
2133 )
2134 );
2135 let _ = std::fs::remove_file(&csv_path);
2136 }
2137
2138 #[tokio::test]
2140 async fn test_s3_input_errors_at_add() {
2141 use cognee_storage::LocalStorage;
2142
2143 let temp_dir = tempfile::tempdir().unwrap();
2144 let storage = LocalStorage::new(temp_dir.path().to_path_buf());
2145
2146 let result = process_input(
2147 &DataInput::S3Path("s3://bucket/key.txt".to_string()),
2148 &storage,
2149 HashAlgorithm::Md5,
2150 Uuid::new_v4(),
2151 None,
2152 )
2153 .await;
2154
2155 assert!(result.is_err(), "S3 input must error at ingest");
2156 }
2157
2158 #[tokio::test]
2159 async fn test_tenant_id_stored() {
2160 let (pipeline, _db) = make_pipeline().await;
2161 let owner_id = Uuid::new_v4();
2162 let tenant_id = Uuid::new_v4();
2163
2164 let result = pipeline
2165 .add(
2166 vec![DataInput::Text("tenant content".to_string())],
2167 "ds",
2168 owner_id,
2169 Some(tenant_id),
2170 )
2171 .await
2172 .unwrap();
2173
2174 assert_eq!(result[0].tenant_id, Some(tenant_id));
2175 }
2176
2177 #[tokio::test]
2178 async fn test_data_item_label_stored() {
2179 let (pipeline, _db) = make_pipeline().await;
2180 let owner_id = Uuid::new_v4();
2181
2182 let result = pipeline
2183 .add(
2184 vec![DataInput::DataItem {
2185 data: Box::new(DataInput::Text("labelled content".to_string())),
2186 label: "my-label".to_string(),
2187 external_metadata: None,
2188 }],
2189 "ds",
2190 owner_id,
2191 None,
2192 )
2193 .await
2194 .unwrap();
2195
2196 assert_eq!(result.len(), 1);
2197 assert_eq!(
2198 result[0].label.as_deref(),
2199 Some("my-label"),
2200 "DataItem label must be stored in the Data record"
2201 );
2202 }
2203
2204 #[test]
2207 fn extract_name_file_path_uses_stem_not_full_name() {
2208 let input = DataInput::FilePath("documents/report.txt".into());
2210 let name = super::extract_name(&input, "abc123");
2211 assert_eq!(
2212 name, "report",
2213 "file path name should be stem (no extension)"
2214 );
2215 }
2216
2217 #[test]
2218 fn extract_name_file_path_with_file_uri() {
2219 let input = DataInput::FilePath("file:///tmp/data/notes.pdf".into());
2220 let name = super::extract_name(&input, "abc123");
2221 assert_eq!(name, "notes");
2222 }
2223
2224 #[test]
2225 fn extract_name_text_input_uses_hash() {
2226 let input = DataInput::Text("hello world".into());
2227 let name = super::extract_name(&input, "5eb63bbbe01eeed093cb22bb8f5acdc3");
2228 assert_eq!(name, "text_5eb63bbbe01eeed093cb22bb8f5acdc3");
2229 }
2230
2231 #[test]
2234 fn binary_md_file_gets_text_plain_mime() {
2235 let input = DataInput::Binary {
2236 name: "notes.md".to_string(),
2237 data: b"# Heading\nSome markdown".to_vec(),
2238 };
2239 let (_name, _ext, mime, _label) = super::extract_file_metadata(&input);
2240 assert_eq!(
2241 mime, "text/plain",
2242 ".md binary should produce text/plain, not text/markdown"
2243 );
2244 }
2245
2246 #[test]
2247 fn file_path_md_gets_text_plain_mime() {
2248 let input = DataInput::FilePath("/tmp/notes.md".to_string());
2249 let (_name, _ext, mime, _label) = super::extract_file_metadata(&input);
2250 assert_eq!(
2251 mime, "text/plain",
2252 ".md file path should produce text/plain, not text/markdown"
2253 );
2254 }
2255
2256 #[test]
2257 fn file_path_json_gets_text_plain_mime() {
2258 let input = DataInput::FilePath("/tmp/data.json".to_string());
2259 let (_name, _ext, mime, _label) = super::extract_file_metadata(&input);
2260 assert_eq!(
2261 mime, "text/plain",
2262 ".json file path should produce text/plain, not application/json"
2263 );
2264 }
2265
2266 #[test]
2267 fn file_path_pdf_keeps_original_mime() {
2268 let input = DataInput::FilePath("/tmp/doc.pdf".to_string());
2269 let (_name, _ext, mime, _label) = super::extract_file_metadata(&input);
2270 assert_ne!(
2271 mime, "text/plain",
2272 ".pdf should NOT be overridden to text/plain"
2273 );
2274 }
2275
2276 #[tokio::test]
2279 async fn test_data_item_external_metadata_stored() {
2280 let (pipeline, _db) = make_pipeline().await;
2281 let owner_id = Uuid::new_v4();
2282
2283 let meta_json = r#"{"source":"dlt","table_name":"orders"}"#.to_string();
2284 let result = pipeline
2285 .add(
2286 vec![DataInput::DataItem {
2287 data: Box::new(DataInput::Text("dlt content".to_string())),
2288 label: "dlt-label".to_string(),
2289 external_metadata: Some(meta_json.clone()),
2290 }],
2291 "ds",
2292 owner_id,
2293 None,
2294 )
2295 .await
2296 .unwrap();
2297
2298 assert_eq!(result.len(), 1);
2299 assert_eq!(
2300 result[0].external_metadata.as_deref(),
2301 Some(meta_json.as_str()),
2302 "DataItem external_metadata must be stored in the Data record"
2303 );
2304 }
2305
2306 #[tokio::test]
2307 async fn test_data_item_without_metadata() {
2308 let (pipeline, _db) = make_pipeline().await;
2309 let owner_id = Uuid::new_v4();
2310
2311 let result = pipeline
2312 .add(
2313 vec![DataInput::DataItem {
2314 data: Box::new(DataInput::Text("no metadata".to_string())),
2315 label: "plain-label".to_string(),
2316 external_metadata: None,
2317 }],
2318 "ds",
2319 owner_id,
2320 None,
2321 )
2322 .await
2323 .unwrap();
2324
2325 assert_eq!(result.len(), 1);
2326 assert_eq!(
2327 result[0].external_metadata, None,
2328 "DataItem with no external_metadata should produce None on Data"
2329 );
2330 }
2331}