Skip to main content

cognee_ingestion/
pipeline.rs

1//! Ingest pipeline built on the cognee-core [`Pipeline`] framework.
2//!
3//! Public surface:
4//! - [`ProcessedInput`] — intermediate type between the two pipeline tasks
5//! - [`process_input`] — Task 1: stream input to storage, compute hash
6//! - [`persist_data`] — Task 2: resolve dataset, deduplicate, persist record
7//! - [`make_process_input_task`] / [`make_persist_data_task`] — [`TypedTask`] wrappers
8//! - [`build_add_pipeline`] — build a composable cognee-core [`Pipeline`]
9//! - [`AddPipeline`] — convenience wrapper with a simple `add()` API
10
11use std::collections::HashMap;
12use std::path::Path;
13use std::sync::Arc;
14use tracing::{info, instrument};
15use uuid::Uuid;
16
17use cognee_core::CpuPool;
18#[cfg(test)]
19use cognee_core::RayonThreadPool;
20use cognee_core::pipeline::DataIdFn;
21use cognee_core::pipeline_run_registry::DbPipelineWatcher;
22use cognee_core::task::Value;
23use cognee_core::{Pipeline, PipelineBuilder, PipelineContext, TaskContextBuilder, TypedTask};
24use cognee_database::{AclDb, DatabaseConnection, IngestDb, PipelineRunRepository};
25use cognee_graph::GraphDBTrait;
26use cognee_models::{Data, DataInput, DataPoint, Dataset, Document};
27use cognee_storage::StorageTrait;
28use cognee_vector::VectorDB;
29
30use crate::content_hasher::HashAlgorithm;
31use crate::id_generation::{generate_data_id, generate_dataset_id};
32use crate::loader_registry::get_loader_name;
33use crate::loaders::{LoaderOutput, LoaderRegistry};
34use crate::url_resolver::UrlMetadata;
35#[cfg(feature = "html-loader")]
36use crate::url_resolver::resolve_url_input;
37
38// ---------------------------------------------------------------------------
39// AddParams
40// ---------------------------------------------------------------------------
41
42/// Optional parameters for the [`AddPipeline::add`] method.
43///
44/// All fields default to `None`/sensible values. Use the builder methods or
45/// struct literal syntax to configure.
46#[derive(Debug, Clone, Default)]
47pub struct AddParams {
48    /// List of node identifiers for graph organisation and access control grouping.
49    /// Stored as a JSON string in `Data.node_set`.
50    pub node_set: Option<Vec<String>>,
51
52    /// Target an existing dataset by UUID instead of name.
53    pub dataset_id: Option<Uuid>,
54
55    /// Maps MIME types or file extensions to preferred loader names.
56    pub preferred_loaders: Option<HashMap<String, String>>,
57
58    /// Importance weight (0.0 to 1.0) for relevance scoring.
59    pub importance_weight: Option<f64>,
60
61    /// When `true`, skip content that is already present in the dataset
62    /// (deduplication by content hash). Matches Python's `incremental_loading`
63    /// parameter. Defaults to `false`; callers that want Python-parity should
64    /// set this to `true`.
65    pub incremental_loading: bool,
66}
67
68// ---------------------------------------------------------------------------
69// ProcessedInput
70// ---------------------------------------------------------------------------
71
72/// Metadata extracted from a [`DataInput`] during streaming processing.
73///
74/// Contains everything needed by [`persist_data`] to create a [`Data`] record
75/// without needing the original `DataInput`.
76#[derive(Debug, Clone)]
77pub struct ProcessedInput {
78    pub content_hash: String,
79    /// MD5 of the EXTRACTED-text file (Python parity, `ingest_data.py:195`).
80    /// Equals `content_hash` only when extracted text == raw bytes (plain text).
81    pub raw_content_hash: String,
82    pub data_id: Uuid,
83    pub storage_location: String,
84    pub label: Option<String>,
85    pub stored_extension: String,
86    pub stored_mime_type: String,
87    pub original_extension: String,
88    pub original_mime_type: String,
89    pub loader_engine: String,
90    pub data_size: i64,
91    pub name: String,
92    pub raw_data_uri: String,
93    pub original_location: String,
94    pub raw_source_uri: Option<String>,
95    pub owner_id: Uuid,
96    pub tenant_id: Option<Uuid>,
97    pub external_metadata: Option<String>,
98    /// JSON-serialized node set identifiers for graph organisation.
99    pub node_set: Option<String>,
100    /// Importance weight for ranking (0.0 to 1.0).
101    pub importance_weight: Option<f64>,
102}
103
104// ---------------------------------------------------------------------------
105// Task 1 implementation: DataInput → ProcessedInput
106// ---------------------------------------------------------------------------
107
108/// Process a single [`DataInput`]: resolve URLs, stream to storage, compute
109/// content hash, and extract all metadata needed to create a [`Data`] record.
110///
111/// This is the first step of the ingest pipeline (Task 1).
112#[instrument(name = "ingestion.process_input", skip(input, storage))]
113pub async fn process_input(
114    input: &DataInput,
115    storage: &dyn StorageTrait,
116    hash_algorithm: HashAlgorithm,
117    owner_id: Uuid,
118    tenant_id: Option<Uuid>,
119) -> Result<ProcessedInput, Box<dyn std::error::Error>> {
120    use tokio::sync::Mutex;
121
122    let (effective_input, resolved_url_metadata, resolved_label, data_item_metadata) =
123        resolve_input_for_processing(input).await?;
124
125    // Determine filename and metadata before streaming.
126    // For URL inputs the Content-Type-derived metadata takes precedence.
127    let (
128        file_name,
129        stored_extension,
130        stored_mime_type,
131        original_extension,
132        original_mime_type,
133        label,
134        loader_engine,
135    ) = if let Some(metadata) = resolved_url_metadata.as_ref() {
136        let fname = format!("text_placeholder.{}", metadata.stored_extension);
137        (
138            fname,
139            metadata.stored_extension.clone(),
140            metadata.stored_mime_type.clone(),
141            metadata.source_extension.clone(),
142            metadata.source_mime_type.clone(),
143            resolved_label,
144            metadata.loader_engine.clone(),
145        )
146    } else {
147        let (fname, ext, mime, lbl) = extract_file_metadata(input);
148        let loader = get_loader_name(&ext).to_string();
149        (fname, ext.clone(), mime.clone(), ext, mime, lbl, loader)
150    };
151
152    // S3 ingestion is not yet implemented (decision D3): the loader-dispatch
153    // path below must NOT silently store raw S3 bytes. Surface a clear error
154    // instead. The `original_*` metadata above still carries the placeholder.
155    if matches!(unwrap_data_item(input), DataInput::S3Path(_)) {
156        return Err(Box::new(IngestionError::S3IngestionUnavailable));
157    }
158
159    let raw_source_uri = if let Some(metadata) = resolved_url_metadata.as_ref()
160        && is_html_url_metadata(metadata)
161    {
162        let raw_file_name = format!("source_placeholder.{}", metadata.source_extension);
163        let raw_location = storage.store(&metadata.raw_bytes, &raw_file_name).await?;
164        Some(storage_location_to_uri(storage.base_path(), &raw_location))
165    } else {
166        None
167    };
168
169    // Rebindable stored-file metadata. For non-URL inputs the loader runs at
170    // ADD time (Python parity, ingest_data.py:103) and the stored artifact
171    // becomes extracted text, so these are overridden to txt/text/plain below.
172    let mut stored_extension = stored_extension;
173    let mut stored_mime_type = stored_mime_type;
174    let mut loader_engine = loader_engine;
175
176    // Buffer the (effective) input bytes. For URL inputs `effective_input` is
177    // the already-extracted text; for everything else it is the raw original
178    // bytes, which we hash (UNCHANGED — sacred) and then feed to the loader.
179    let size_counter: Arc<Mutex<i64>> = Arc::new(Mutex::new(0i64));
180    let raw_bytes: Arc<Mutex<Vec<u8>>> = Arc::new(Mutex::new(Vec::new()));
181
182    let size_clone = size_counter.clone();
183    let raw_bytes_clone = raw_bytes.clone();
184
185    effective_input
186        .process_by_chunks(move |chunk| {
187            let size = size_clone.clone();
188            let bytes = raw_bytes_clone.clone();
189            let chunk_owned = chunk.to_vec();
190            async move {
191                *size.lock().await += chunk_owned.len() as i64;
192                bytes.lock().await.extend_from_slice(&chunk_owned);
193                Ok::<(), Box<dyn std::error::Error>>(())
194            }
195        })
196        .await?;
197
198    // Finalise hash — content-only, no owner_id (Python compatible). For
199    // non-URL inputs this is the MD5 of the RAW original bytes; `data_id` is
200    // derived from it. Both are SACRED and must not change.
201    let collected = Arc::try_unwrap(raw_bytes)
202        .map_err(|_| "Failed to unwrap bytes")?
203        .into_inner();
204    let content_hash =
205        crate::content_hasher::ContentHasher::hash_content(&collected, hash_algorithm);
206    let data_size = Arc::try_unwrap(size_counter)
207        .map_err(|_| "Failed to unwrap size counter")?
208        .into_inner();
209
210    let data_id = generate_data_id(&content_hash, owner_id, tenant_id);
211
212    // Store the artifact + compute `raw_content_hash` (MD5 of the stored
213    // extracted-text file — Python parity, ingest_data.py:195).
214    let (storage_location, raw_content_hash) = if resolved_url_metadata.is_some() {
215        // URL path: the loader already ran at fetch time, so `collected` IS the
216        // extracted text. Store it verbatim under the metadata-derived name and
217        // extension (preserving the existing URL behaviour, incl. non-HTML
218        // URLs that keep their source extension). `raw_content_hash` is the MD5
219        // of that stored file, which equals `content_hash` here.
220        let location = storage.store(&collected, &file_name).await?;
221        (location, content_hash.clone())
222    } else {
223        // Non-URL path: run the document loader at ADD time and store the
224        // EXTRACTED text as `text_<content_hash>.txt` (Python text_loader.py:76
225        // names the file with the ORIGINAL file's content hash).
226        let registry = LoaderRegistry::default_registry();
227        let doc_type = cognee_models::doc_type_for_extension(&original_extension)
228            .unwrap_or("text")
229            .to_string();
230
231        // Binary/non-text blob fallback (pre-task-17 raw-storage parity).
232        //
233        // Rust classifies by extension only, so a file with no (or an unknown)
234        // extension falls back to `doc_type = "text"` and the text loader. The
235        // text loader rejects non-UTF-8 bytes, which would make `add` fail for
236        // generic binary blobs that have no specific loader — a regression
237        // introduced by running loaders at ADD time (task 17).
238        //
239        // Match the pre-task-17 behaviour: when the would-be loader is the
240        // *text* loader but the bytes are not valid UTF-8 text, store the RAW
241        // bytes verbatim instead of running text extraction. This only affects
242        // the generic-text fallback path: files routed to a real loader by
243        // their extension (text/csv/html/pdf — actually-text content; or
244        // image/audio — `UnsupportedDocumentType`) are unchanged.
245        let is_text_doc_type = doc_type == "text";
246        if is_text_doc_type && std::str::from_utf8(&collected).is_err() {
247            let stored_name = format!("text_{content_hash}.bin");
248            let location = storage.store(&collected, &stored_name).await?;
249            // Raw bytes are stored verbatim, so `raw_content_hash` equals the
250            // (content-only) `content_hash` of those same bytes.
251            (location, content_hash.clone())
252        } else {
253            let loader = registry.get(&doc_type).ok_or_else(|| {
254                Box::new(IngestionError::UnsupportedDocumentType {
255                    document_type: doc_type.clone(),
256                }) as Box<dyn std::error::Error>
257            })?;
258
259            // Minimal Document descriptor for the loader (only metadata fields
260            // the loaders read; the bytes are passed separately).
261            let descriptor = build_loader_descriptor(
262                data_id,
263                &extract_name(input, &content_hash),
264                &original_extension,
265                &original_mime_type,
266            );
267
268            let extracted_text = match loader.extract(&collected, &descriptor).await? {
269                LoaderOutput::Text(t) => t,
270                LoaderOutput::Rows(rows) => rows.join("\n\n"),
271                LoaderOutput::SingleChunk { text, .. } => text,
272            };
273            let extracted_bytes = extracted_text.into_bytes();
274
275            let stored_name = format!("text_{content_hash}.txt");
276            let location = storage.store(&extracted_bytes, &stored_name).await?;
277
278            // Stored-file metadata now describes the extracted text.
279            stored_extension = "txt".to_string();
280            stored_mime_type = "text/plain".to_string();
281            loader_engine = loader.engine_name().to_string();
282
283            let raw_content_hash = crate::content_hasher::ContentHasher::hash_content(
284                &extracted_bytes,
285                hash_algorithm,
286            );
287            (location, raw_content_hash)
288        }
289    };
290
291    // Compute derived fields that previously lived in add()
292    let raw_data_uri = storage_location_to_uri(storage.base_path(), &storage_location);
293    let name = extract_name(input, &content_hash);
294    let original_location = if let Some(uri) = raw_source_uri.clone() {
295        uri
296    } else {
297        match input {
298            DataInput::Text(_) => raw_data_uri.clone(),
299            _ => extract_original_location(input),
300        }
301    };
302    let external_metadata =
303        merge_external_metadata(data_item_metadata, resolved_url_metadata.as_ref())?;
304
305    Ok(ProcessedInput {
306        content_hash,
307        raw_content_hash,
308        data_id,
309        storage_location,
310        label,
311        stored_extension,
312        stored_mime_type,
313        original_extension,
314        original_mime_type,
315        loader_engine: loader_engine.to_string(),
316        data_size,
317        name,
318        raw_data_uri,
319        original_location,
320        raw_source_uri,
321        owner_id,
322        tenant_id,
323        external_metadata,
324        node_set: None,
325        importance_weight: None,
326    })
327}
328
329// ---------------------------------------------------------------------------
330// Task 2 implementation: ProcessedInput → Data
331// ---------------------------------------------------------------------------
332
333/// Persist a [`ProcessedInput`] as a [`Data`] record: resolve or create the
334/// dataset, deduplicate by content hash, create the record if new, and attach
335/// it to the dataset.
336///
337/// Dataset resolution uses a deterministic UUID5 ID so the lookup + optional
338/// `INSERT OR IGNORE` is idempotent and cheap — safe to call once per item.
339///
340/// This is the second step of the ingest pipeline (Task 2).
341#[instrument(
342    name = "ingestion.persist_data",
343    skip(processed, database),
344    fields(data_id = %processed.data_id)
345)]
346pub async fn persist_data(
347    processed: &ProcessedInput,
348    database: &dyn IngestDb,
349    dataset_name: &str,
350    owner_id: Uuid,
351    tenant_id: Option<Uuid>,
352) -> Result<Data, Box<dyn std::error::Error>> {
353    persist_data_with_acl(
354        processed,
355        database,
356        dataset_name,
357        owner_id,
358        tenant_id,
359        None,
360        None,
361    )
362    .await
363}
364
365/// Like [`persist_data`], but optionally grants all four ACL permissions
366/// (read, write, delete, share) to the owner when a new dataset is created.
367///
368/// When `acl_db` is `Some`, the owner is ensured as a principal and receives
369/// all permissions on newly created datasets, matching Python's
370/// `create_authorized_dataset()` behavior.
371///
372/// When `target_dataset_id` is `Some`, the pipeline looks up the dataset by UUID
373/// instead of name, allowing callers to target a specific existing dataset.
374#[instrument(
375    name = "ingestion.persist_data_with_acl",
376    skip(processed, database, acl_db),
377    fields(data_id = %processed.data_id)
378)]
379pub async fn persist_data_with_acl(
380    processed: &ProcessedInput,
381    database: &dyn IngestDb,
382    dataset_name: &str,
383    owner_id: Uuid,
384    tenant_id: Option<Uuid>,
385    acl_db: Option<&dyn AclDb>,
386    target_dataset_id: Option<Uuid>,
387) -> Result<Data, Box<dyn std::error::Error>> {
388    // Resolve the dataset: prefer explicit UUID, fall back to name-based lookup.
389    let is_new_dataset;
390    let dataset = if let Some(ds_id) = target_dataset_id {
391        match database.get_dataset(ds_id).await? {
392            Some(ds) => {
393                is_new_dataset = false;
394                ds
395            }
396            None => {
397                return Err(format!("Dataset with id {ds_id} not found").into());
398            }
399        }
400    } else {
401        let generated_id = generate_dataset_id(dataset_name, owner_id, tenant_id);
402        match database
403            .get_dataset_by_name(dataset_name, owner_id, tenant_id)
404            .await?
405        {
406            Some(ds) => {
407                is_new_dataset = false;
408                ds
409            }
410            None => {
411                is_new_dataset = true;
412                let new_dataset =
413                    Dataset::new(dataset_name.to_string(), owner_id, tenant_id, generated_id);
414                database.create_dataset(new_dataset).await?
415            }
416        }
417    };
418    info!(dataset_id = %dataset.id, "dataset resolved");
419
420    // Grant all permissions to the owner when a new dataset is created.
421    if is_new_dataset && let Some(acl) = acl_db {
422        cognee_database::ops::acl::grant_all_permissions_on_dataset_via_trait(
423            acl, owner_id, dataset.id,
424        )
425        .await?;
426        info!(
427            dataset_id = %dataset.id,
428            owner_id = %owner_id,
429            "ACL permissions granted on new dataset"
430        );
431    }
432
433    let data_id = processed.data_id;
434
435    if let Some(existing_data) = database.get_data(data_id).await? {
436        database.attach_data_to_dataset(dataset.id, data_id).await?;
437        info!(data_id = %data_id, is_duplicate = true, "input processed");
438        return Ok(existing_data);
439    }
440
441    let mut data_builder = Data::builder(
442        data_id,
443        processed.name.clone(),
444        processed.raw_data_uri.clone(),
445        processed.original_location.clone(),
446        processed.stored_extension.clone(),
447        processed.stored_mime_type.clone(),
448        processed.content_hash.clone(),
449        processed.owner_id,
450    )
451    .original_extension(processed.original_extension.clone())
452    .original_mime_type(processed.original_mime_type.clone())
453    .loader_engine(processed.loader_engine.clone())
454    .raw_content_hash(processed.raw_content_hash.clone())
455    .data_size(processed.data_size);
456    if let Some(tid) = processed.tenant_id {
457        data_builder = data_builder.tenant_id(tid);
458    }
459    if let Some(ref lbl) = processed.label {
460        data_builder = data_builder.label(lbl.clone());
461    }
462    if let Some(ref meta) = processed.external_metadata {
463        data_builder = data_builder.external_metadata(meta.clone());
464    }
465    if let Some(ref ns) = processed.node_set {
466        data_builder = data_builder.node_set(ns.clone());
467    }
468    if let Some(w) = processed.importance_weight {
469        data_builder = data_builder.importance_weight(w);
470    }
471    let data = data_builder.build();
472
473    let saved_data = database.create_data(data).await?;
474
475    database.attach_data_to_dataset(dataset.id, data_id).await?;
476
477    info!(data_id = %data_id, is_duplicate = false, "input processed");
478    Ok(saved_data)
479}
480
481// ---------------------------------------------------------------------------
482// Private helpers
483// ---------------------------------------------------------------------------
484
485/// Resolve the MIME type for a file extension.
486///
487/// If the extension maps to `"text_loader"` in the loader registry, return
488/// `"text/plain"` to match Python's behaviour (Python's `filetype.guess()`
489/// returns `text/plain` for `.md`, `.json`, `.xml`, etc. because they have no
490/// magic bytes). Otherwise fall back to `mime_guess`.
491fn resolve_mime(extension: &str, path_for_guess: &str) -> String {
492    if get_loader_name(extension) == "text_loader" {
493        "text/plain".to_string()
494    } else {
495        mime_guess::from_path(path_for_guess)
496            .first_or_octet_stream()
497            .to_string()
498    }
499}
500
501/// Unwrap a [`DataInput::DataItem`] to its underlying input, returning other
502/// variants unchanged. Used to detect the underlying variant (e.g. `S3Path`)
503/// regardless of `DataItem` wrapping.
504fn unwrap_data_item(input: &DataInput) -> &DataInput {
505    match input {
506        DataInput::DataItem { data, .. } => unwrap_data_item(data),
507        other => other,
508    }
509}
510
511/// Build a minimal [`Document`] descriptor passed to a [`DocumentLoader`] at
512/// ADD time. Only the metadata fields loaders read are populated; the content
513/// bytes are passed separately to `extract`.
514fn build_loader_descriptor(
515    data_id: Uuid,
516    name: &str,
517    extension: &str,
518    mime_type: &str,
519) -> Document {
520    let doc_type = cognee_models::doc_type_for_extension(extension)
521        .unwrap_or("text")
522        .to_string();
523    let mut base = DataPoint::new("Document", None);
524    base.id = data_id;
525    Document {
526        base,
527        document_type: doc_type,
528        name: name.to_string(),
529        raw_data_location: String::new(),
530        mime_type: mime_type.to_string(),
531        extension: extension.to_string(),
532        data_id,
533        external_metadata: None,
534    }
535}
536
537/// Return `(file_name, extension, mime_type, label)` for the given input.
538fn extract_file_metadata(input: &DataInput) -> (String, String, String, Option<String>) {
539    match input {
540        DataInput::FilePath(path) => {
541            let clean_path = path.strip_prefix("file://").unwrap_or(path);
542            let p = Path::new(clean_path);
543            let file_name = p
544                .file_name()
545                .and_then(|n| n.to_str())
546                .unwrap_or("file.bin")
547                .to_string();
548            let extension = p
549                .extension()
550                .and_then(|e| e.to_str())
551                .unwrap_or("")
552                .to_string();
553            let mime = resolve_mime(&extension, clean_path);
554            (file_name, extension, mime, None)
555        }
556        DataInput::Text(_) => {
557            // Will be renamed to text_<hash>.txt after hashing; use placeholder for now
558            (
559                "text_placeholder.txt".to_string(),
560                "txt".to_string(),
561                "text/plain".to_string(),
562                None,
563            )
564        }
565        DataInput::Url(_url) => {
566            // Fetched HTML is extracted to plain text and stored as text_<hash>.txt.
567            // Extension and MIME reflect the original source (HTML), loader = beautiful_soup_loader.
568            (
569                "text_placeholder.txt".to_string(),
570                "html".to_string(),
571                "text/html".to_string(),
572                None,
573            )
574        }
575        // TODO(COG-4456): replace placeholder metadata when S3 ingestion is implemented.
576        DataInput::S3Path(_) => (
577            "s3_file.bin".to_string(),
578            "bin".to_string(),
579            "application/octet-stream".to_string(),
580            None,
581        ),
582        DataInput::Binary { name, .. } => {
583            let ext = Path::new(name)
584                .extension()
585                .and_then(|e| e.to_str())
586                .unwrap_or("bin")
587                .to_string();
588            let mime = resolve_mime(&ext, name);
589            (name.clone(), ext, mime, None)
590        }
591        DataInput::DataItem { data, label, .. } => {
592            let (file_name, ext, mime, _) = extract_file_metadata(data);
593            (file_name, ext, mime, Some(label.clone()))
594        }
595    }
596}
597
598/// Convert a relative storage location into a `file://` absolute URI.
599///
600/// Mirrors Python's `Path(full_file_path).as_uri()` which always produces
601/// an absolute `file:///…` URI.  If `base_path` is relative, it is
602/// resolved against the current working directory first so that the URI
603/// stored in the database is always absolute and self-contained.
604fn storage_location_to_uri(base_path: &str, location: &str) -> String {
605    if base_path.is_empty() {
606        // MockStorage or other non-filesystem backend — return as-is
607        location.to_string()
608    } else {
609        let joined = Path::new(base_path).join(location);
610        // Canonicalize to absolute; fall back to manual cwd join on error
611        // (e.g. the path doesn't exist on disk yet during tests).
612        let abs = if joined.is_absolute() {
613            joined
614        } else {
615            std::env::current_dir().unwrap_or_default().join(&joined)
616        };
617        format!("file://{}", abs.display())
618    }
619}
620
621async fn resolve_input_for_processing(
622    input: &DataInput,
623) -> Result<
624    (
625        DataInput,
626        Option<UrlMetadata>,
627        Option<String>,
628        Option<String>,
629    ),
630    Box<dyn std::error::Error>,
631> {
632    match input {
633        DataInput::Url(url) => {
634            #[cfg(feature = "html-loader")]
635            {
636                let resolved = resolve_url_input(url).await?;
637                Ok((resolved.input, Some(resolved.metadata), None, None))
638            }
639            #[cfg(not(feature = "html-loader"))]
640            {
641                let _ = url;
642                Err(Box::new(IngestionError::UrlIngestionUnavailable))
643            }
644        }
645        DataInput::DataItem {
646            data,
647            label,
648            external_metadata,
649        } => {
650            if let DataInput::Url(_url) = data.as_ref() {
651                #[cfg(feature = "html-loader")]
652                {
653                    let resolved = resolve_url_input(_url).await?;
654                    Ok((
655                        resolved.input,
656                        Some(resolved.metadata),
657                        Some(label.clone()),
658                        external_metadata.clone(),
659                    ))
660                }
661                #[cfg(not(feature = "html-loader"))]
662                {
663                    Err(Box::new(IngestionError::UrlIngestionUnavailable))
664                }
665            } else {
666                Ok((
667                    input.clone(),
668                    None,
669                    Some(label.clone()),
670                    external_metadata.clone(),
671                ))
672            }
673        }
674        _ => Ok((input.clone(), None, None, None)),
675    }
676}
677
678fn is_html_url_metadata(metadata: &UrlMetadata) -> bool {
679    metadata.essence == "text/html" || metadata.essence == "application/xhtml+xml"
680}
681
682fn merge_external_metadata(
683    data_item_metadata: Option<String>,
684    url_metadata: Option<&UrlMetadata>,
685) -> Result<Option<String>, serde_json::Error> {
686    let Some(metadata) = url_metadata else {
687        return Ok(data_item_metadata);
688    };
689
690    let mut merged = serde_json::Map::new();
691    let mut user_metadata_object = None;
692    let mut has_conflict = false;
693
694    if let Some(user_metadata) = data_item_metadata {
695        match serde_json::from_str::<serde_json::Value>(&user_metadata) {
696            Ok(serde_json::Value::Object(user_object)) => {
697                user_metadata_object = Some(serde_json::Value::Object(user_object.clone()));
698                for (key, value) in user_object {
699                    merged.insert(key, value);
700                }
701            }
702            _ => {
703                merged.insert(
704                    "data_item_external_metadata_raw".to_string(),
705                    serde_json::Value::String(user_metadata),
706                );
707            }
708        }
709    }
710
711    let url_fields = [
712        ("source", serde_json::json!("url")),
713        ("url", serde_json::json!(metadata.requested_url.clone())),
714        ("final_url", serde_json::json!(metadata.final_url.clone())),
715        (
716            "content_type",
717            serde_json::json!(metadata.content_type.clone()),
718        ),
719    ];
720    for (key, value) in url_fields {
721        if merged.contains_key(key) {
722            has_conflict = true;
723        }
724        merged.insert(key.to_string(), value);
725    }
726    if let Some(title) = &metadata.title {
727        if merged.contains_key("title") {
728            has_conflict = true;
729        }
730        merged.insert("title".to_string(), serde_json::json!(title));
731    }
732    if has_conflict && let Some(user_metadata) = user_metadata_object {
733        merged.insert("data_item_external_metadata".to_string(), user_metadata);
734    }
735
736    serde_json::to_string(&serde_json::Value::Object(merged)).map(Some)
737}
738
739/// Derive a human-readable name for the stored Data record.
740fn extract_name(input: &DataInput, content_hash: &str) -> String {
741    match input {
742        DataInput::Text(_) => format!("text_{content_hash}"),
743        DataInput::FilePath(path) => {
744            let clean_path = path.strip_prefix("file://").unwrap_or(path);
745            Path::new(clean_path)
746                .file_stem()
747                .and_then(|n| n.to_str())
748                .unwrap_or("unknown")
749                .to_string()
750        }
751        DataInput::Url(_) => format!("text_{content_hash}"),
752        DataInput::S3Path(path) => path
753            .split('/')
754            .next_back()
755            .unwrap_or("s3_content")
756            .to_string(),
757        DataInput::Binary { name, .. } => name.clone(),
758        DataInput::DataItem { data, .. } => extract_name(data, content_hash),
759    }
760}
761
762fn extract_original_location(input: &DataInput) -> String {
763    match input {
764        DataInput::Text(_) => "text://inline".to_string(),
765        DataInput::FilePath(path) => {
766            if path.starts_with("file://") {
767                path.clone()
768            } else {
769                format!("file://{path}")
770            }
771        }
772        DataInput::Url(url) => url.clone(),
773        DataInput::S3Path(path) => path.clone(),
774        DataInput::Binary { name, .. } => format!("binary://{name}"),
775        DataInput::DataItem { data, .. } => extract_original_location(data),
776    }
777}
778
779// ---------------------------------------------------------------------------
780// Task 1 wrapper: DataInput → ProcessedInput
781// ---------------------------------------------------------------------------
782
783/// Build a [`TypedTask`] that streams a [`DataInput`] to storage, hashes its
784/// content, and returns a self-contained [`ProcessedInput`].
785pub fn make_process_input_task(
786    storage: Arc<dyn StorageTrait>,
787    hash_algorithm: HashAlgorithm,
788    owner_id: Uuid,
789    tenant_id: Option<Uuid>,
790) -> TypedTask<DataInput, ProcessedInput> {
791    TypedTask::async_fn(move |input: &DataInput, _ctx| {
792        let input = input.clone();
793        let storage = Arc::clone(&storage);
794        Box::pin(async move {
795            process_input(&input, &*storage, hash_algorithm, owner_id, tenant_id)
796                .await
797                .map(Box::new)
798                .map_err(|e| format!("{e}").into())
799        })
800    })
801}
802
803// ---------------------------------------------------------------------------
804// Task 2 wrapper: ProcessedInput → Data
805// ---------------------------------------------------------------------------
806
807/// Build a [`TypedTask`] that resolves or creates the dataset, deduplicates by
808/// content hash, persists a new [`Data`] record if needed, and returns it.
809pub fn make_persist_data_task(
810    database: Arc<dyn IngestDb>,
811    dataset_name: String,
812    owner_id: Uuid,
813    tenant_id: Option<Uuid>,
814) -> TypedTask<ProcessedInput, Data> {
815    make_persist_data_task_with_acl(database, dataset_name, owner_id, tenant_id, None)
816}
817
818/// Like [`make_persist_data_task`], but optionally grants ACL permissions
819/// on newly created datasets.
820pub fn make_persist_data_task_with_acl(
821    database: Arc<dyn IngestDb>,
822    dataset_name: String,
823    owner_id: Uuid,
824    tenant_id: Option<Uuid>,
825    acl_db: Option<Arc<dyn AclDb>>,
826) -> TypedTask<ProcessedInput, Data> {
827    make_persist_data_task_with_acl_and_params(
828        database,
829        dataset_name,
830        owner_id,
831        tenant_id,
832        acl_db,
833        AddParamsInjection::default(),
834    )
835}
836
837/// Pre-serialised [`AddParams`] payload injected into [`ProcessedInput`] by
838/// the persist closure. Constructed once per pipeline so the cost of
839/// serialising `node_set` is paid up-front, not per item.
840///
841/// Locked Decision 7 (LIB-06) — `AddParams` is wired in via the task
842/// closure rather than a `RunSpec` / `TaskContext` extension.
843#[derive(Debug, Clone, Default)]
844struct AddParamsInjection {
845    node_set_json: Option<String>,
846    importance_weight: Option<f64>,
847    target_dataset_id: Option<Uuid>,
848}
849
850/// Build a persist task whose closure also patches the [`ProcessedInput`]
851/// with the [`AddParams`] fields (`node_set`, `importance_weight`) and
852/// honours an optional `dataset_id` override.
853fn make_persist_data_task_with_acl_and_params(
854    database: Arc<dyn IngestDb>,
855    dataset_name: String,
856    owner_id: Uuid,
857    tenant_id: Option<Uuid>,
858    acl_db: Option<Arc<dyn AclDb>>,
859    add_params: AddParamsInjection,
860) -> TypedTask<ProcessedInput, Data> {
861    TypedTask::async_fn(move |processed: &ProcessedInput, _ctx| {
862        let mut processed = processed.clone();
863        // Decision 7 (LIB-06): inject add-specific params inside the task.
864        if let Some(ref ns) = add_params.node_set_json {
865            processed.node_set = Some(ns.clone());
866        }
867        if let Some(w) = add_params.importance_weight {
868            processed.importance_weight = Some(w);
869        }
870        let override_ds = add_params.target_dataset_id;
871        let database = Arc::clone(&database);
872        let dataset_name = dataset_name.clone();
873        let acl_db = acl_db.clone();
874        Box::pin(async move {
875            persist_data_with_acl(
876                &processed,
877                &*database,
878                &dataset_name,
879                owner_id,
880                tenant_id,
881                acl_db.as_deref(),
882                override_ds,
883            )
884            .await
885            .map(Box::new)
886            .map_err(|e| format!("{e}").into())
887        })
888    })
889}
890
891// ---------------------------------------------------------------------------
892// Pipeline builder
893// ---------------------------------------------------------------------------
894
895/// Build a complete ingest [`Pipeline`]: [`DataInput`] → [`ProcessedInput`] → [`Data`].
896pub fn build_add_pipeline(
897    storage: Arc<dyn StorageTrait>,
898    database: Arc<dyn IngestDb>,
899    hash_algorithm: HashAlgorithm,
900    dataset_name: &str,
901    owner_id: Uuid,
902    tenant_id: Option<Uuid>,
903) -> Pipeline {
904    build_add_pipeline_with_acl(
905        storage,
906        database,
907        hash_algorithm,
908        dataset_name,
909        owner_id,
910        tenant_id,
911        None,
912    )
913}
914
915/// Like [`build_add_pipeline`], but optionally grants ACL permissions on
916/// newly created datasets.
917pub fn build_add_pipeline_with_acl(
918    storage: Arc<dyn StorageTrait>,
919    database: Arc<dyn IngestDb>,
920    hash_algorithm: HashAlgorithm,
921    dataset_name: &str,
922    owner_id: Uuid,
923    tenant_id: Option<Uuid>,
924    acl_db: Option<Arc<dyn AclDb>>,
925) -> Pipeline {
926    build_add_pipeline_internal(
927        storage,
928        database,
929        hash_algorithm,
930        dataset_name,
931        owner_id,
932        tenant_id,
933        acl_db,
934        AddParamsInjection::default(),
935    )
936}
937
938/// Internal builder used by both [`build_add_pipeline_with_acl`] and the
939/// executor-routed [`AddPipeline::add_with_params`]. Threads
940/// [`AddParamsInjection`] into the persist task and attaches a
941/// `data_id_fn` so the executor's `run_info["data"]` carrier is populated
942/// for the
943/// [`cognee_core::pipeline_run_registry::DbPipelineWatcher`] wired by
944/// [`AddPipeline::with_pipeline_run_repo`] (gap 08-07).
945#[allow(clippy::too_many_arguments)]
946fn build_add_pipeline_internal(
947    storage: Arc<dyn StorageTrait>,
948    database: Arc<dyn IngestDb>,
949    hash_algorithm: HashAlgorithm,
950    dataset_name: &str,
951    owner_id: Uuid,
952    tenant_id: Option<Uuid>,
953    acl_db: Option<Arc<dyn AclDb>>,
954    add_params: AddParamsInjection,
955) -> Pipeline {
956    // Locked Decision 4 (LIB-06): the per-input `data_id_fn` operates on
957    // the pipeline input (`DataInput`), which has no UUID until
958    // `persist_data` runs. Return `None` here; the executor's run_info
959    // `data_ids` stays empty (the watcher maps it to Python's `"None"`).
960    // Gap-08 task 07 revisits this once the watcher is real.
961    let data_id_fn: DataIdFn = Arc::new(|_v: Arc<dyn Value>| None);
962    PipelineBuilder::new_with_task(
963        "ingestion.add",
964        make_process_input_task(Arc::clone(&storage), hash_algorithm, owner_id, tenant_id),
965    )
966    .add_task(make_persist_data_task_with_acl_and_params(
967        database,
968        dataset_name.to_string(),
969        owner_id,
970        tenant_id,
971        acl_db,
972        add_params,
973    ))
974    // Persisted pipeline-run name; must match Python (add.py:319) and the
975    // canonical "add_pipeline" name DatasetManager::get_status queries.
976    .with_name("add_pipeline")
977    .with_data_id(data_id_fn)
978    .build()
979}
980
981// ---------------------------------------------------------------------------
982// AddPipeline — convenience wrapper
983// ---------------------------------------------------------------------------
984
985/// Ingest pipeline driven by the cognee-core task framework.
986///
987/// Routes [`add`](Self::add) / [`add_with_params`](Self::add_with_params)
988/// through [`cognee_core::pipeline::execute`] so the executor's lifecycle
989/// hooks fire and `TaskContext`-aware tasks can publish run-scoped payload
990/// (LIB-06 / gap 08-07).
991///
992/// Backend handles required by [`cognee_core::TaskContextBuilder`]
993/// — `thread_pool`, `graph_db`, `vector_db`, `database` — must be attached
994/// via the chainable builders before calling `add()`. `AddPipeline::new`
995/// does **not** populate them; calling `add()` on an under-configured
996/// pipeline returns `IngestionError::MissingBackend { ... }`.
997///
998/// For composable pipeline-based execution (with concurrency, retry, etc.),
999/// use [`build_add_pipeline`] + [`cognee_core::execute`] directly.
1000///
1001/// [`cognee_core::execute`]: cognee_core::execute
1002pub struct AddPipeline {
1003    storage: Arc<dyn StorageTrait>,
1004    database: Arc<dyn IngestDb>,
1005    hash_algorithm: HashAlgorithm,
1006    acl_db: Option<Arc<dyn AclDb>>,
1007    // ─── Executor-context handles (LIB-06) ────────────────────────────────
1008    thread_pool: Option<Arc<dyn CpuPool>>,
1009    graph_db: Option<Arc<dyn GraphDBTrait>>,
1010    vector_db: Option<Arc<dyn VectorDB>>,
1011    db_connection: Option<Arc<DatabaseConnection>>,
1012    // ─── Pipeline-run trail (gap 08-07) ───────────────────────────────────
1013    pipeline_run_repo: Option<Arc<dyn PipelineRunRepository>>,
1014}
1015
1016impl AddPipeline {
1017    /// Create with the default MD5 hashing (Python-compatible).
1018    ///
1019    /// **Note:** `add()` routes through [`cognee_core::pipeline::execute`]
1020    /// and requires the four executor-context handles attached via
1021    /// [`with_thread_pool`](Self::with_thread_pool),
1022    /// [`with_graph_db`](Self::with_graph_db),
1023    /// [`with_vector_db`](Self::with_vector_db),
1024    /// [`with_database`](Self::with_database). A missing handle surfaces as
1025    /// `IngestionError::MissingBackend` at `add()` time.
1026    pub fn new(storage: Arc<dyn StorageTrait>, database: Arc<dyn IngestDb>) -> Self {
1027        Self {
1028            storage,
1029            database,
1030            hash_algorithm: HashAlgorithm::default(),
1031            acl_db: None,
1032            thread_pool: None,
1033            graph_db: None,
1034            vector_db: None,
1035            db_connection: None,
1036            pipeline_run_repo: None,
1037        }
1038    }
1039
1040    /// Create with an explicit hash algorithm.
1041    pub fn new_with_algorithm(
1042        storage: Arc<dyn StorageTrait>,
1043        database: Arc<dyn IngestDb>,
1044        hash_algorithm: HashAlgorithm,
1045    ) -> Self {
1046        Self {
1047            storage,
1048            database,
1049            hash_algorithm,
1050            acl_db: None,
1051            thread_pool: None,
1052            graph_db: None,
1053            vector_db: None,
1054            db_connection: None,
1055            pipeline_run_repo: None,
1056        }
1057    }
1058
1059    /// Enable ACL permission grants on newly created datasets.
1060    ///
1061    /// When set, the pipeline grants all four permissions (read, write, delete,
1062    /// share) to the owner on each newly created dataset, matching Python's
1063    /// `create_authorized_dataset()` behavior.
1064    pub fn with_acl_db(mut self, acl_db: Arc<dyn AclDb>) -> Self {
1065        self.acl_db = Some(acl_db);
1066        self
1067    }
1068
1069    /// Attach the CPU pool used by [`cognee_core::TaskContext`].
1070    pub fn with_thread_pool(mut self, pool: Arc<dyn CpuPool>) -> Self {
1071        self.thread_pool = Some(pool);
1072        self
1073    }
1074
1075    /// Attach the graph database backend used by [`cognee_core::TaskContext`].
1076    pub fn with_graph_db(mut self, graph: Arc<dyn GraphDBTrait>) -> Self {
1077        self.graph_db = Some(graph);
1078        self
1079    }
1080
1081    /// Attach the vector database backend used by [`cognee_core::TaskContext`].
1082    pub fn with_vector_db(mut self, vectors: Arc<dyn VectorDB>) -> Self {
1083        self.vector_db = Some(vectors);
1084        self
1085    }
1086
1087    /// Attach the relational [`DatabaseConnection`] used by
1088    /// [`cognee_core::TaskContext`]. This is the same SeaORM handle the
1089    /// SQL-backed `IngestDb` is built on.
1090    pub fn with_database(mut self, db: Arc<DatabaseConnection>) -> Self {
1091        self.db_connection = Some(db);
1092        self
1093    }
1094
1095    /// Attach the `PipelineRunRepository` used to persist the four-state
1096    /// `pipeline_runs` trail (gap 08-07).
1097    ///
1098    /// Embedded callers pass `Arc::new(NoopPipelineRunRepository::new())`
1099    /// (or simply omit this call — the pipeline defaults to no-op). CLI
1100    /// and HTTP callers pass `Arc::new(SeaOrmPipelineRunRepository::new(db))`
1101    /// so the rows surface in `/api/v1/activity/pipeline-runs`.
1102    pub fn with_pipeline_run_repo(mut self, repo: Arc<dyn PipelineRunRepository>) -> Self {
1103        self.pipeline_run_repo = Some(repo);
1104        self
1105    }
1106
1107    #[instrument(
1108        name = "ingestion.add",
1109        skip(self, inputs),
1110        fields(dataset_name, owner_id = %owner_id, inputs_count = inputs.len())
1111    )]
1112    pub async fn add(
1113        &self,
1114        inputs: Vec<DataInput>,
1115        dataset_name: &str,
1116        owner_id: Uuid,
1117        tenant_id: Option<Uuid>,
1118    ) -> Result<Vec<Data>, Box<dyn std::error::Error>> {
1119        self.add_with_params(
1120            inputs,
1121            dataset_name,
1122            owner_id,
1123            tenant_id,
1124            &AddParams::default(),
1125        )
1126        .await
1127    }
1128
1129    /// Like [`add`](Self::add), but accepts additional optional parameters.
1130    #[instrument(
1131        name = "ingestion.add_with_params",
1132        skip(self, inputs, params),
1133        fields(dataset_name, owner_id = %owner_id, inputs_count = inputs.len())
1134    )]
1135    pub async fn add_with_params(
1136        &self,
1137        inputs: Vec<DataInput>,
1138        dataset_name: &str,
1139        owner_id: Uuid,
1140        tenant_id: Option<Uuid>,
1141        params: &AddParams,
1142    ) -> Result<Vec<Data>, Box<dyn std::error::Error>> {
1143        // ── Resolve executor-context handles ─────────────────────────────
1144        let thread_pool = self
1145            .thread_pool
1146            .clone()
1147            .ok_or(IngestionError::MissingBackend {
1148                which: "thread_pool",
1149            })?;
1150        let graph_db = self
1151            .graph_db
1152            .clone()
1153            .ok_or(IngestionError::MissingBackend { which: "graph_db" })?;
1154        let vector_db = self
1155            .vector_db
1156            .clone()
1157            .ok_or(IngestionError::MissingBackend { which: "vector_db" })?;
1158        let db_connection = self
1159            .db_connection
1160            .clone()
1161            .ok_or(IngestionError::MissingBackend { which: "database" })?;
1162
1163        // ── Pre-serialise add-params (Decision 7) ────────────────────────
1164        let node_set_json = params
1165            .node_set
1166            .as_ref()
1167            .map(serde_json::to_string)
1168            .transpose()
1169            .map_err(|e| format!("Failed to serialize node_set: {e}"))?;
1170        let add_params_inj = AddParamsInjection {
1171            node_set_json,
1172            importance_weight: params.importance_weight,
1173            target_dataset_id: params.dataset_id,
1174        };
1175
1176        // ── Build the typed pipeline ─────────────────────────────────────
1177        let pipeline = build_add_pipeline_internal(
1178            Arc::clone(&self.storage),
1179            Arc::clone(&self.database),
1180            self.hash_algorithm,
1181            dataset_name,
1182            owner_id,
1183            tenant_id,
1184            self.acl_db.clone(),
1185            add_params_inj,
1186        );
1187
1188        // ── Build the TaskContext ────────────────────────────────────────
1189        // The executor re-derives `PipelineRunInfo.pipeline_id` from
1190        // `(pipeline.name, user_id, dataset_id)` — see
1191        // `cognee_core::pipeline::execute` and `deterministic_pipeline_id`.
1192        // We carry `pipeline.id` here as the placeholder; the watcher
1193        // observes the derived value.
1194        let pipeline_ctx = PipelineContext {
1195            pipeline_id: pipeline.id,
1196            pipeline_name: pipeline.name.clone().unwrap_or_default(),
1197            user_id: Some(owner_id),
1198            tenant_id,
1199            dataset_id: params.dataset_id,
1200            current_data: None,
1201            run_id: None,
1202            user_email: None,
1203            provenance_visited: Arc::new(std::sync::Mutex::new(std::collections::HashSet::new())),
1204        };
1205
1206        let (_cancel_handle, ctx) = TaskContextBuilder::new()
1207            .thread_pool(thread_pool)
1208            .database(db_connection)
1209            .graph_db(graph_db)
1210            .vector_db(vector_db)
1211            .pipeline_context(pipeline_ctx)
1212            .build()
1213            .map_err(|e| Box::new(e) as Box<dyn std::error::Error>)?;
1214        let ctx = Arc::new(ctx);
1215
1216        // ── Erase typed inputs ───────────────────────────────────────────
1217        let typed_inputs: Vec<Arc<dyn Value>> = inputs
1218            .into_iter()
1219            .map(|i| Arc::new(i) as Arc<dyn Value>)
1220            .collect();
1221
1222        // ── Run the executor (gap 08-07 Decision 11: DbPipelineWatcher
1223        //    persists the four-state `pipeline_runs` trail; defaults to a
1224        //    no-op repo when the caller hasn't attached one). ───────────────
1225        let pipeline_run_repo = self
1226            .pipeline_run_repo
1227            .clone()
1228            .unwrap_or_else(cognee_database::NoopPipelineRunRepository::arc);
1229        let watcher = DbPipelineWatcher::new(pipeline_run_repo);
1230        let outputs = cognee_core::pipeline::execute(&pipeline, typed_inputs, ctx, &watcher)
1231            .await
1232            .map_err(|e| Box::new(e) as Box<dyn std::error::Error>)?;
1233
1234        extract_data_outputs(outputs)
1235    }
1236}
1237
1238// ---------------------------------------------------------------------------
1239// Output extraction (Decision 9)
1240// ---------------------------------------------------------------------------
1241
1242/// Downcast the executor's [`Arc<dyn Value>`] outputs back to the concrete
1243/// [`Data`] type the convenience function promises.
1244///
1245/// Returns [`IngestionError::OutputTypeMismatch`] when the downcast fails —
1246/// a programmer error indicating the pipeline's last task does not emit
1247/// `Data`.
1248fn extract_data_outputs(
1249    outputs: Vec<Arc<dyn Value>>,
1250) -> Result<Vec<Data>, Box<dyn std::error::Error>> {
1251    let mut data_vec = Vec::with_capacity(outputs.len());
1252    for o in outputs {
1253        // Explicit deref through `Arc` to reach the inner `dyn Value`, then
1254        // call `as_any` via vtable dispatch. Without this, method resolution
1255        // finds `<Arc<dyn Value> as Value>::as_any()` (via the blanket impl)
1256        // which downcasts to `Arc<dyn Value>` and never to `Data`. Mirrors
1257        // the pattern in `cognee_core::task::Task::borrow_input`.
1258        let d = (*o).as_any().downcast_ref::<Data>().cloned().ok_or(
1259            IngestionError::OutputTypeMismatch {
1260                expected: "Data",
1261                actual: "unknown",
1262            },
1263        )?;
1264        data_vec.push(d);
1265    }
1266    Ok(data_vec)
1267}
1268
1269// ---------------------------------------------------------------------------
1270// IngestionError
1271// ---------------------------------------------------------------------------
1272
1273/// Error returned by [`AddPipeline::add`] when a required executor-context
1274/// handle was not attached, or when the pipeline's typed output cannot be
1275/// downcast to [`Data`].
1276#[derive(Debug, thiserror::Error)]
1277pub enum IngestionError {
1278    /// A required `AddPipeline` builder field was not attached before
1279    /// calling `add()`.
1280    #[error("AddPipeline missing required backend: {which}")]
1281    MissingBackend { which: &'static str },
1282    /// The executor returned an output the pipeline cannot downcast to the
1283    /// expected concrete type.
1284    #[error("AddPipeline output type mismatch: expected {expected}, actual {actual}")]
1285    OutputTypeMismatch {
1286        expected: &'static str,
1287        actual: &'static str,
1288    },
1289    /// A `DataInput::Url` was supplied but the `html-loader` feature (which
1290    /// provides URL crawling/fetching) is not enabled in this build.
1291    #[error("URL ingestion requires the `html-loader` feature to be enabled")]
1292    UrlIngestionUnavailable,
1293    /// A `DataInput::S3Path` was supplied. S3 ingestion is not yet implemented
1294    /// (decision D3); the loader-dispatch path must not store raw S3 bytes.
1295    #[error("S3 ingestion is not yet implemented")]
1296    S3IngestionUnavailable,
1297    /// No loader is registered for the document type derived from the input's
1298    /// extension (the relevant loader feature is disabled in this build).
1299    #[error("Unsupported document type at ingest: {document_type}")]
1300    UnsupportedDocumentType { document_type: String },
1301}
1302
1303// ---------------------------------------------------------------------------
1304// Tests
1305// ---------------------------------------------------------------------------
1306
1307#[cfg(test)]
1308#[allow(
1309    clippy::unwrap_used,
1310    clippy::expect_used,
1311    reason = "test code — panics are acceptable failures"
1312)]
1313mod tests {
1314    use super::*;
1315    use cognee_database::{connect, initialize, ops};
1316    use cognee_graph::MockGraphDB;
1317    #[cfg(feature = "html-loader")]
1318    use cognee_storage::LocalStorage;
1319    use cognee_storage::MockStorage;
1320    use cognee_vector::MockVectorDB;
1321    #[cfg(feature = "html-loader")]
1322    use mockito::{Server, ServerGuard};
1323    use std::io::Write;
1324    use tempfile::NamedTempFile;
1325
1326    async fn make_pipeline() -> (AddPipeline, Arc<cognee_database::DatabaseConnection>) {
1327        let db = connect("sqlite::memory:").await.unwrap();
1328        initialize(&db).await.unwrap();
1329        let db = Arc::new(db);
1330        let storage: Arc<dyn StorageTrait> = Arc::new(MockStorage::new());
1331        let pipeline = AddPipeline::new(storage, db.clone() as Arc<dyn IngestDb>)
1332            .with_thread_pool(Arc::new(RayonThreadPool::with_default_threads().unwrap()))
1333            .with_graph_db(Arc::new(MockGraphDB::new()))
1334            .with_vector_db(Arc::new(MockVectorDB::new()))
1335            .with_database(Arc::clone(&db));
1336        (pipeline, db)
1337    }
1338
1339    #[cfg(feature = "html-loader")]
1340    async fn server_with_robots() -> ServerGuard {
1341        let mut server = Server::new_async().await;
1342        server
1343            .mock("GET", "/robots.txt")
1344            .with_status(404)
1345            .create_async()
1346            .await;
1347        server
1348    }
1349
1350    #[cfg(feature = "html-loader")]
1351    #[tokio::test]
1352    async fn test_process_input_url_html_stores_text_with_source_metadata() {
1353        let mut server = server_with_robots().await;
1354        let html = "<html><head><title>Example</title><style>.x{display:none}</style></head><body><h1>Visible text</h1><script>hidden()</script></body></html>";
1355        let url = format!("{}/page.html", server.url());
1356        let _mock = server
1357            .mock("GET", "/page.html")
1358            .with_header("content-type", "text/html; charset=utf-8")
1359            .with_body(html)
1360            .create_async()
1361            .await;
1362        let temp_dir = tempfile::tempdir().unwrap();
1363        let storage = LocalStorage::new(temp_dir.path().to_path_buf());
1364
1365        let processed = process_input(
1366            &DataInput::Url(url.clone()),
1367            &storage,
1368            HashAlgorithm::Md5,
1369            Uuid::new_v4(),
1370            None,
1371        )
1372        .await
1373        .unwrap();
1374
1375        let raw_source_uri = processed.raw_source_uri.as_ref().unwrap();
1376        let raw_html = storage.retrieve(raw_source_uri).await.unwrap();
1377        assert_eq!(raw_html, html.as_bytes());
1378        assert!(raw_source_uri.ends_with(".html"));
1379        assert!(processed.raw_data_uri.ends_with(".txt"));
1380        assert_ne!(processed.raw_data_uri, *raw_source_uri);
1381
1382        let stored = storage.retrieve(&processed.raw_data_uri).await.unwrap();
1383        let stored_text = String::from_utf8(stored).unwrap();
1384        assert!(stored_text.contains("Visible text"));
1385        assert!(!stored_text.contains("<html>"));
1386        assert!(!stored_text.contains("hidden()"));
1387        assert!(!stored_text.contains("display:none"));
1388        assert_eq!(processed.stored_extension, "txt");
1389        assert_eq!(processed.stored_mime_type, "text/plain");
1390        assert_eq!(processed.original_extension, "html");
1391        assert_eq!(processed.original_mime_type, "text/html");
1392        assert_eq!(processed.loader_engine, "beautiful_soup_loader");
1393        assert_eq!(processed.original_location, *raw_source_uri);
1394
1395        let metadata: serde_json::Value =
1396            serde_json::from_str(processed.external_metadata.as_ref().unwrap()).unwrap();
1397        assert_eq!(metadata["source"], "url");
1398        assert_eq!(metadata["url"], url);
1399        assert_eq!(metadata["final_url"], url);
1400        assert_eq!(metadata["content_type"], "text/html; charset=utf-8");
1401        assert_eq!(metadata["title"], "Example");
1402    }
1403
1404    #[cfg(feature = "html-loader")]
1405    #[tokio::test]
1406    async fn test_persist_data_url_html_uses_text_payload_and_raw_html_original_location() {
1407        let mut server = server_with_robots().await;
1408        let url = format!("{}/page", server.url());
1409        let html = "<html><head><title>XHTML</title></head><body>XHTML body</body></html>";
1410        let _mock = server
1411            .mock("GET", "/page")
1412            .with_header("content-type", "application/xhtml+xml")
1413            .with_body(html)
1414            .create_async()
1415            .await;
1416        let db = connect("sqlite::memory:").await.unwrap();
1417        initialize(&db).await.unwrap();
1418        let temp_dir = tempfile::tempdir().unwrap();
1419        let storage = LocalStorage::new(temp_dir.path().to_path_buf());
1420        let owner_id = Uuid::new_v4();
1421
1422        let processed = process_input(
1423            &DataInput::Url(url),
1424            &storage,
1425            HashAlgorithm::Md5,
1426            owner_id,
1427            None,
1428        )
1429        .await
1430        .unwrap();
1431        let data = persist_data(&processed, &db, "url-html", owner_id, None)
1432            .await
1433            .unwrap();
1434
1435        assert_eq!(data.extension, "txt");
1436        assert_eq!(data.mime_type, "text/plain");
1437        assert!(data.raw_data_location.ends_with(".txt"));
1438        assert!(data.original_data_location.ends_with(".html"));
1439        assert_ne!(data.raw_data_location, data.original_data_location);
1440        assert_eq!(
1441            storage
1442                .retrieve(&data.original_data_location)
1443                .await
1444                .unwrap(),
1445            html.as_bytes()
1446        );
1447        assert_eq!(data.original_extension.as_deref(), Some("html"));
1448        assert_eq!(
1449            data.original_mime_type.as_deref(),
1450            Some("application/xhtml+xml")
1451        );
1452        assert_eq!(data.loader_engine.as_deref(), Some("beautiful_soup_loader"));
1453    }
1454
1455    #[cfg(feature = "html-loader")]
1456    #[tokio::test]
1457    async fn test_data_item_url_merges_metadata_and_preserves_label() {
1458        let mut server = server_with_robots().await;
1459        let url = format!("{}/wrapped", server.url());
1460        let _mock = server
1461            .mock("GET", "/wrapped")
1462            .with_header("content-type", "text/html")
1463            .with_body("<html><head><title>Wrapped</title></head><body>Wrapped body</body></html>")
1464            .create_async()
1465            .await;
1466        let db = connect("sqlite::memory:").await.unwrap();
1467        initialize(&db).await.unwrap();
1468        let temp_dir = tempfile::tempdir().unwrap();
1469        let storage = LocalStorage::new(temp_dir.path().to_path_buf());
1470        let owner_id = Uuid::new_v4();
1471        let processed = process_input(
1472            &DataInput::DataItem {
1473                data: Box::new(DataInput::Url(url.clone())),
1474                label: "wrapped-label".to_string(),
1475                external_metadata: Some(r#"{"custom":"keep","rank":7}"#.to_string()),
1476            },
1477            &storage,
1478            HashAlgorithm::Md5,
1479            owner_id,
1480            None,
1481        )
1482        .await
1483        .unwrap();
1484        let data = persist_data(&processed, &db, "wrapped-url", owner_id, None)
1485            .await
1486            .unwrap();
1487
1488        assert_eq!(data.label.as_deref(), Some("wrapped-label"));
1489        assert_eq!(data.extension, "txt");
1490        assert!(data.original_data_location.ends_with(".html"));
1491        let metadata: serde_json::Value =
1492            serde_json::from_str(data.external_metadata.as_ref().unwrap()).unwrap();
1493        assert_eq!(metadata["custom"], "keep");
1494        assert_eq!(metadata["rank"], 7);
1495        assert_eq!(metadata["source"], "url");
1496        assert_eq!(metadata["url"], url);
1497        assert_eq!(metadata["final_url"], url);
1498        assert_eq!(metadata["content_type"], "text/html");
1499        assert_eq!(metadata["title"], "Wrapped");
1500    }
1501
1502    #[cfg(feature = "html-loader")]
1503    #[tokio::test]
1504    async fn test_data_item_url_invalid_or_non_object_metadata_preserved_under_raw_field() {
1505        let mut server = server_with_robots().await;
1506        let cases = [
1507            ("/invalid-meta", "not-json"),
1508            ("/non-object-meta", r#"["not","an","object"]"#),
1509        ];
1510
1511        for (path, user_metadata) in cases {
1512            let url = format!("{}{}", server.url(), path);
1513            let _mock = server
1514                .mock("GET", path)
1515                .with_header("content-type", "text/html")
1516                .with_body("<html><body>Metadata body</body></html>")
1517                .create_async()
1518                .await;
1519            let temp_dir = tempfile::tempdir().unwrap();
1520            let storage = LocalStorage::new(temp_dir.path().to_path_buf());
1521            let owner_id = Uuid::new_v4();
1522
1523            let processed = process_input(
1524                &DataInput::DataItem {
1525                    data: Box::new(DataInput::Url(url.clone())),
1526                    label: "invalid-meta".to_string(),
1527                    external_metadata: Some(user_metadata.to_string()),
1528                },
1529                &storage,
1530                HashAlgorithm::Md5,
1531                owner_id,
1532                None,
1533            )
1534            .await
1535            .unwrap();
1536
1537            let metadata: serde_json::Value =
1538                serde_json::from_str(processed.external_metadata.as_ref().unwrap()).unwrap();
1539            assert_eq!(metadata["data_item_external_metadata_raw"], user_metadata);
1540            assert_eq!(metadata["source"], "url");
1541            assert_eq!(metadata["url"], url);
1542        }
1543    }
1544
1545    #[cfg(feature = "html-loader")]
1546    #[tokio::test]
1547    async fn test_non_html_url_inputs_do_not_store_raw_source_copy() {
1548        let mut server = server_with_robots().await;
1549        let cases = [
1550            ("/plain", "text/plain", "plain body", "txt", "text/plain"),
1551            (
1552                "/json",
1553                "application/json",
1554                r#"{"hello":"world"}"#,
1555                "json",
1556                "application/json",
1557            ),
1558            ("/csv", "text/csv", "a,b\n1,2\n", "csv", "text/csv"),
1559            (
1560                "/pdf",
1561                "application/pdf",
1562                "%PDF-1.7\n",
1563                "pdf",
1564                "application/pdf",
1565            ),
1566        ];
1567
1568        for (path, content_type, body, expected_ext, expected_mime) in cases {
1569            let url = format!("{}{}", server.url(), path);
1570            let _mock = server
1571                .mock("GET", path)
1572                .with_header("content-type", content_type)
1573                .with_body(body)
1574                .create_async()
1575                .await;
1576            let temp_dir = tempfile::tempdir().unwrap();
1577            let storage = LocalStorage::new(temp_dir.path().to_path_buf());
1578            let processed = process_input(
1579                &DataInput::Url(url.clone()),
1580                &storage,
1581                HashAlgorithm::Md5,
1582                Uuid::new_v4(),
1583                None,
1584            )
1585            .await
1586            .unwrap();
1587
1588            assert_eq!(processed.raw_source_uri, None);
1589            assert_eq!(processed.original_location, url);
1590            assert_eq!(processed.stored_extension, expected_ext);
1591            assert_eq!(processed.stored_mime_type, expected_mime);
1592            assert_eq!(
1593                storage.retrieve(&processed.raw_data_uri).await.unwrap(),
1594                body.as_bytes()
1595            );
1596        }
1597    }
1598
1599    #[tokio::test]
1600    async fn test_add_text_input() {
1601        let (pipeline, db) = make_pipeline().await;
1602        let owner_id = Uuid::new_v4();
1603
1604        let inputs = vec![DataInput::Text("Hello, world!".to_string())];
1605        let result = pipeline.add(inputs, "test_dataset", owner_id, None).await;
1606        assert!(result.is_ok(), "add should succeed: {:?}", result.err());
1607
1608        let data = result.unwrap();
1609        assert_eq!(data.len(), 1);
1610        // Name for text inputs is text_<hash>
1611        assert!(
1612            data[0].name.starts_with("text_"),
1613            "name should start with text_"
1614        );
1615        assert_eq!(data[0].mime_type, "text/plain");
1616        assert_eq!(data[0].extension, "txt");
1617
1618        let datasets = ops::datasets::list_datasets_by_owner(&db, owner_id)
1619            .await
1620            .unwrap();
1621        assert_eq!(datasets.len(), 1);
1622        let ds_data = ops::datasets::get_dataset_data(&db, datasets[0].id)
1623            .await
1624            .unwrap();
1625        assert_eq!(ds_data.len(), 1);
1626    }
1627
1628    #[tokio::test]
1629    async fn test_add_file_input() {
1630        let (pipeline, db) = make_pipeline().await;
1631        let owner_id = Uuid::new_v4();
1632
1633        let mut temp_file = NamedTempFile::new().unwrap();
1634        writeln!(temp_file, "Test file content").unwrap();
1635        let file_path = temp_file.path().to_str().unwrap().to_string();
1636
1637        let inputs = vec![DataInput::FilePath(file_path)];
1638        let result = pipeline.add(inputs, "test_dataset", owner_id, None).await;
1639        assert!(result.is_ok());
1640
1641        let data = result.unwrap();
1642        assert_eq!(data.len(), 1);
1643        assert!(!data[0].name.is_empty());
1644
1645        let datasets = ops::datasets::list_datasets_by_owner(&db, owner_id)
1646            .await
1647            .unwrap();
1648        assert_eq!(datasets.len(), 1);
1649    }
1650
1651    #[tokio::test]
1652    async fn test_add_multiple_inputs() {
1653        let (pipeline, db) = make_pipeline().await;
1654        let owner_id = Uuid::new_v4();
1655
1656        let inputs = vec![
1657            DataInput::Text("First text".to_string()),
1658            DataInput::Text("Second text".to_string()),
1659        ];
1660        let result = pipeline.add(inputs, "test_dataset", owner_id, None).await;
1661        assert!(result.is_ok());
1662
1663        let data = result.unwrap();
1664        assert_eq!(data.len(), 2);
1665
1666        let datasets = ops::datasets::list_datasets_by_owner(&db, owner_id)
1667            .await
1668            .unwrap();
1669        assert_eq!(datasets.len(), 1);
1670        let ds_data = ops::datasets::get_dataset_data(&db, datasets[0].id)
1671            .await
1672            .unwrap();
1673        assert_eq!(ds_data.len(), 2);
1674    }
1675
1676    #[tokio::test]
1677    async fn test_deduplication_same_content() {
1678        let (pipeline, db) = make_pipeline().await;
1679        let owner_id = Uuid::new_v4();
1680
1681        let content = "Duplicate content";
1682        let result1 = pipeline
1683            .add(
1684                vec![DataInput::Text(content.to_string())],
1685                "test_dataset",
1686                owner_id,
1687                None,
1688            )
1689            .await
1690            .unwrap();
1691        let result2 = pipeline
1692            .add(
1693                vec![DataInput::Text(content.to_string())],
1694                "test_dataset",
1695                owner_id,
1696                None,
1697            )
1698            .await
1699            .unwrap();
1700
1701        assert_eq!(result1[0].id, result2[0].id);
1702        assert_eq!(result1[0].content_hash, result2[0].content_hash);
1703
1704        let dataset = ops::datasets::get_dataset_by_name(&db, "test_dataset", owner_id, None)
1705            .await
1706            .unwrap()
1707            .unwrap();
1708        let ds_data = ops::datasets::get_dataset_data(&db, dataset.id)
1709            .await
1710            .unwrap();
1711        assert_eq!(ds_data.len(), 1);
1712    }
1713
1714    #[tokio::test]
1715    async fn test_different_owners_same_hash_different_ids() {
1716        let (pipeline, _db) = make_pipeline().await;
1717        let owner1 = Uuid::new_v4();
1718        let owner2 = Uuid::new_v4();
1719
1720        let result1 = pipeline
1721            .add(
1722                vec![DataInput::Text("Same content".to_string())],
1723                "ds1",
1724                owner1,
1725                None,
1726            )
1727            .await
1728            .unwrap();
1729        let result2 = pipeline
1730            .add(
1731                vec![DataInput::Text("Same content".to_string())],
1732                "ds2",
1733                owner2,
1734                None,
1735            )
1736            .await
1737            .unwrap();
1738
1739        // Content hash is content-only (Python compat): same content → same hash
1740        assert_eq!(
1741            result1[0].content_hash, result2[0].content_hash,
1742            "content hash is owner-independent"
1743        );
1744        // But data_id differs because owner_id is mixed into UUID5 seed
1745        assert_ne!(result1[0].id, result2[0].id, "data_id must differ by owner");
1746    }
1747
1748    #[tokio::test]
1749    async fn test_multiple_datasets() {
1750        let (pipeline, db) = make_pipeline().await;
1751        let owner_id = Uuid::new_v4();
1752
1753        pipeline
1754            .add(
1755                vec![DataInput::Text("Content 1".to_string())],
1756                "dataset1",
1757                owner_id,
1758                None,
1759            )
1760            .await
1761            .unwrap();
1762        pipeline
1763            .add(
1764                vec![DataInput::Text("Content 2".to_string())],
1765                "dataset2",
1766                owner_id,
1767                None,
1768            )
1769            .await
1770            .unwrap();
1771
1772        let datasets = ops::datasets::list_datasets_by_owner(&db, owner_id)
1773            .await
1774            .unwrap();
1775        assert_eq!(datasets.len(), 2);
1776    }
1777
1778    #[tokio::test]
1779    async fn test_reuse_dataset() {
1780        let (pipeline, db) = make_pipeline().await;
1781        let owner_id = Uuid::new_v4();
1782
1783        pipeline
1784            .add(
1785                vec![DataInput::Text("Content 1".to_string())],
1786                "same_dataset",
1787                owner_id,
1788                None,
1789            )
1790            .await
1791            .unwrap();
1792        pipeline
1793            .add(
1794                vec![DataInput::Text("Content 2".to_string())],
1795                "same_dataset",
1796                owner_id,
1797                None,
1798            )
1799            .await
1800            .unwrap();
1801
1802        let datasets = ops::datasets::list_datasets_by_owner(&db, owner_id)
1803            .await
1804            .unwrap();
1805        assert_eq!(datasets.len(), 1);
1806        let ds_data = ops::datasets::get_dataset_data(&db, datasets[0].id)
1807            .await
1808            .unwrap();
1809        assert_eq!(ds_data.len(), 2);
1810    }
1811
1812    #[tokio::test]
1813    async fn test_content_hash_deterministic() {
1814        let (pipeline, _db) = make_pipeline().await;
1815        let owner_id = Uuid::new_v4();
1816
1817        let result1 = pipeline
1818            .add(
1819                vec![DataInput::Text("Test content".to_string())],
1820                "dataset1",
1821                owner_id,
1822                None,
1823            )
1824            .await
1825            .unwrap();
1826        let result2 = pipeline
1827            .add(
1828                vec![DataInput::Text("Test content".to_string())],
1829                "dataset1",
1830                owner_id,
1831                None,
1832            )
1833            .await
1834            .unwrap();
1835
1836        assert_eq!(result1[0].content_hash, result2[0].content_hash);
1837        assert_eq!(result1[0].id, result2[0].id);
1838    }
1839
1840    /// Regression guard for telemetry gap 05-02 (DataPoint provenance audit).
1841    ///
1842    /// Provenance task 05-03's `extract_content_hash_from_value` walks every
1843    /// input `Data` looking for the first non-empty `content_hash`. If any
1844    /// ingestion path leaves the column empty, downstream stamping silently
1845    /// regresses. This test asserts that every `DataInput` variant reachable
1846    /// from `process_data_input` produces a non-empty hex digest, and that
1847    /// the value round-trips through the SeaORM `Data` <-> `data::Model`
1848    /// conversion (the canonical DB read path).
1849    #[tokio::test]
1850    async fn test_content_hash_non_empty_across_variants_and_db_roundtrip() {
1851        let (pipeline, db) = make_pipeline().await;
1852        let owner_id = Uuid::new_v4();
1853
1854        // 1. Text input.
1855        let text_data = pipeline
1856            .add(
1857                vec![DataInput::Text("Provenance audit text".to_string())],
1858                "audit_text",
1859                owner_id,
1860                None,
1861            )
1862            .await
1863            .unwrap();
1864        assert!(
1865            !text_data[0].content_hash.is_empty(),
1866            "Text input must populate content_hash"
1867        );
1868
1869        // 2. File input.
1870        let mut temp_file = NamedTempFile::new().unwrap();
1871        writeln!(temp_file, "Provenance audit file").unwrap();
1872        let file_path = temp_file.path().to_str().unwrap().to_string();
1873        let file_data = pipeline
1874            .add(
1875                vec![DataInput::FilePath(file_path)],
1876                "audit_file",
1877                owner_id,
1878                None,
1879            )
1880            .await
1881            .unwrap();
1882        assert!(
1883            !file_data[0].content_hash.is_empty(),
1884            "FilePath input must populate content_hash"
1885        );
1886
1887        // 3. Binary input.
1888        let binary_data = pipeline
1889            .add(
1890                vec![DataInput::Binary {
1891                    name: "audit.bin".to_string(),
1892                    data: b"provenance audit binary".to_vec(),
1893                }],
1894                "audit_binary",
1895                owner_id,
1896                None,
1897            )
1898            .await
1899            .unwrap();
1900        assert!(
1901            !binary_data[0].content_hash.is_empty(),
1902            "Binary input must populate content_hash"
1903        );
1904
1905        // 4. DataItem wrapper — must propagate the inner Text variant's hash.
1906        let wrapped = pipeline
1907            .add(
1908                vec![DataInput::DataItem {
1909                    data: Box::new(DataInput::Text("Wrapped audit text".to_string())),
1910                    label: "wrapped".to_string(),
1911                    external_metadata: None,
1912                }],
1913                "audit_wrapped",
1914                owner_id,
1915                None,
1916            )
1917            .await
1918            .unwrap();
1919        assert!(
1920            !wrapped[0].content_hash.is_empty(),
1921            "DataItem(Text) must populate content_hash"
1922        );
1923
1924        // 5. Round-trip through the DB read path: the value `add()` returned
1925        //    must equal the value `get_data()` reads back via
1926        //    `From<data::Model> for Data`.
1927        let reread = ops::data::get_data(&db, text_data[0].id)
1928            .await
1929            .unwrap()
1930            .expect("data row exists immediately after add()");
1931        assert_eq!(
1932            reread.content_hash, text_data[0].content_hash,
1933            "content_hash must round-trip through SeaORM <-> Data conversion"
1934        );
1935    }
1936
1937    #[tokio::test]
1938    async fn test_dataset_id_is_deterministic() {
1939        let (pipeline, db) = make_pipeline().await;
1940        let owner_id = Uuid::new_v4();
1941
1942        pipeline
1943            .add(
1944                vec![DataInput::Text("any content".to_string())],
1945                "my_dataset",
1946                owner_id,
1947                None,
1948            )
1949            .await
1950            .unwrap();
1951        pipeline
1952            .add(
1953                vec![DataInput::Text("other content".to_string())],
1954                "my_dataset",
1955                owner_id,
1956                None,
1957            )
1958            .await
1959            .unwrap();
1960
1961        // There must be exactly one dataset (deterministic ID prevents duplicates)
1962        let datasets = ops::datasets::list_datasets_by_owner(&db, owner_id)
1963            .await
1964            .unwrap();
1965        assert_eq!(
1966            datasets.len(),
1967            1,
1968            "deterministic dataset ID should prevent duplicate creation"
1969        );
1970    }
1971
1972    #[tokio::test]
1973    async fn test_loader_engine_populated() {
1974        // After running the loader at ADD time, the stored artifact is the
1975        // extracted text and `loader_engine` reflects the loader that ran.
1976        // A `.txt` file runs the always-available text loader.
1977        let (pipeline, _db) = make_pipeline().await;
1978        let owner_id = Uuid::new_v4();
1979
1980        let mut temp_file = NamedTempFile::new().unwrap();
1981        writeln!(temp_file, "content").unwrap();
1982        let txt_path = temp_file.path().with_extension("txt");
1983        std::fs::copy(temp_file.path(), &txt_path).unwrap();
1984
1985        let result = pipeline
1986            .add(
1987                vec![DataInput::FilePath(txt_path.to_str().unwrap().to_string())],
1988                "ds",
1989                owner_id,
1990                None,
1991            )
1992            .await
1993            .unwrap();
1994
1995        assert_eq!(result[0].loader_engine.as_deref(), Some("text_loader"));
1996        assert_eq!(result[0].extension, "txt");
1997        let _ = std::fs::remove_file(&txt_path);
1998    }
1999
2000    /// With a loader feature disabled, ADD must surface a clear error rather
2001    /// than silently store raw bytes. Under default features no `pdf` loader is
2002    /// registered, so a `.pdf` input errors at ingest (Python parity gotcha).
2003    #[cfg(not(any(feature = "pdf-pdfium", feature = "pdf-pure-rust")))]
2004    #[tokio::test]
2005    async fn test_unsupported_loader_type_errors_at_add() {
2006        let (pipeline, _db) = make_pipeline().await;
2007        let owner_id = Uuid::new_v4();
2008
2009        let mut temp_file = NamedTempFile::new().unwrap();
2010        writeln!(temp_file, "%PDF-1.7").unwrap();
2011        let pdf_path = temp_file.path().with_extension("pdf");
2012        std::fs::copy(temp_file.path(), &pdf_path).unwrap();
2013
2014        let result = pipeline
2015            .add(
2016                vec![DataInput::FilePath(pdf_path.to_str().unwrap().to_string())],
2017                "ds",
2018                owner_id,
2019                None,
2020            )
2021            .await;
2022
2023        assert!(
2024            result.is_err(),
2025            "pdf input must error when the pdf loader feature is off"
2026        );
2027        let _ = std::fs::remove_file(&pdf_path);
2028    }
2029
2030    /// Text-path no-regression: the stored artifact is byte-identical to the
2031    /// input, `raw_content_hash == content_hash`, `extension == "txt"`, and the
2032    /// content hash / data_id match the pinned Python-compatible values.
2033    #[tokio::test]
2034    async fn test_text_path_no_regression_hashes_and_stored_bytes() {
2035        use cognee_storage::LocalStorage;
2036
2037        // Pinned values for the fixed string (MD5 of the bytes, verified by the
2038        // content_hasher unit tests: md5("hello world") == HELLO_WORLD_MD5).
2039        const HELLO_WORLD: &str = "hello world";
2040        const HELLO_WORLD_MD5: &str = "5eb63bbbe01eeed093cb22bb8f5acdc3";
2041
2042        let temp_dir = tempfile::tempdir().unwrap();
2043        let storage = LocalStorage::new(temp_dir.path().to_path_buf());
2044        let owner_id = Uuid::new_v4();
2045
2046        let processed = process_input(
2047            &DataInput::Text(HELLO_WORLD.to_string()),
2048            &storage,
2049            HashAlgorithm::Md5,
2050            owner_id,
2051            None,
2052        )
2053        .await
2054        .unwrap();
2055
2056        // content_hash + data_id derived from RAW bytes — SACRED, must not change.
2057        assert_eq!(processed.content_hash, HELLO_WORLD_MD5);
2058        assert_eq!(
2059            processed.data_id,
2060            generate_data_id(HELLO_WORLD_MD5, owner_id, None),
2061            "data_id must remain uuid5(content_hash + owner + tenant)"
2062        );
2063
2064        // For plain text the extracted text equals the raw bytes.
2065        assert_eq!(
2066            processed.raw_content_hash, processed.content_hash,
2067            "plain-text raw_content_hash must equal content_hash"
2068        );
2069        assert_eq!(processed.stored_extension, "txt");
2070        assert_eq!(processed.stored_mime_type, "text/plain");
2071        assert_eq!(processed.loader_engine, "text_loader");
2072
2073        // Stored file content is byte-identical to the input.
2074        let stored = storage.retrieve(&processed.raw_data_uri).await.unwrap();
2075        assert_eq!(stored, HELLO_WORLD.as_bytes());
2076    }
2077
2078    /// CSV path (feature-gated): the stored artifact is the EXTRACTED text
2079    /// (rows formatted as Python does), `extension == "txt"`, the stored file
2080    /// name ends in `text_<content_hash>.txt`, `original_extension == "csv"`,
2081    /// and `raw_content_hash != content_hash`.
2082    #[cfg(feature = "csv-loader")]
2083    #[tokio::test]
2084    async fn test_csv_path_stores_extracted_text() {
2085        use cognee_storage::LocalStorage;
2086
2087        let csv = "name,age\nAlice,30\nBob,25\n";
2088        let temp_dir = tempfile::tempdir().unwrap();
2089        let storage = LocalStorage::new(temp_dir.path().to_path_buf());
2090        let owner_id = Uuid::new_v4();
2091
2092        let mut temp_file = NamedTempFile::new().unwrap();
2093        write!(temp_file, "{csv}").unwrap();
2094        let csv_path = temp_file.path().with_extension("csv");
2095        std::fs::copy(temp_file.path(), &csv_path).unwrap();
2096
2097        let processed = process_input(
2098            &DataInput::FilePath(csv_path.to_str().unwrap().to_string()),
2099            &storage,
2100            HashAlgorithm::Md5,
2101            owner_id,
2102            None,
2103        )
2104        .await
2105        .unwrap();
2106
2107        // content_hash is the MD5 of the RAW csv bytes (unchanged).
2108        assert_eq!(
2109            processed.content_hash,
2110            crate::content_hasher::ContentHasher::hash_content(csv.as_bytes(), HashAlgorithm::Md5)
2111        );
2112        // Stored artifact is extracted text, not raw csv.
2113        assert_eq!(processed.stored_extension, "txt");
2114        assert_eq!(processed.stored_mime_type, "text/plain");
2115        assert_eq!(processed.original_extension, "csv");
2116        assert_eq!(processed.loader_engine, "csv_loader");
2117        assert_ne!(
2118            processed.raw_content_hash, processed.content_hash,
2119            "extracted csv text differs from raw bytes"
2120        );
2121
2122        let stored = storage.retrieve(&processed.raw_data_uri).await.unwrap();
2123        let stored_text = String::from_utf8(stored).unwrap();
2124        assert!(stored_text.contains("name: Alice, age: 30"));
2125        assert!(!stored_text.contains("name,age"));
2126
2127        // raw_content_hash is the MD5 of the stored extracted text.
2128        assert_eq!(
2129            processed.raw_content_hash,
2130            crate::content_hasher::ContentHasher::hash_content(
2131                stored_text.as_bytes(),
2132                HashAlgorithm::Md5
2133            )
2134        );
2135        let _ = std::fs::remove_file(&csv_path);
2136    }
2137
2138    /// S3 ingestion (decision D3) must error at ingest, never store raw bytes.
2139    #[tokio::test]
2140    async fn test_s3_input_errors_at_add() {
2141        use cognee_storage::LocalStorage;
2142
2143        let temp_dir = tempfile::tempdir().unwrap();
2144        let storage = LocalStorage::new(temp_dir.path().to_path_buf());
2145
2146        let result = process_input(
2147            &DataInput::S3Path("s3://bucket/key.txt".to_string()),
2148            &storage,
2149            HashAlgorithm::Md5,
2150            Uuid::new_v4(),
2151            None,
2152        )
2153        .await;
2154
2155        assert!(result.is_err(), "S3 input must error at ingest");
2156    }
2157
2158    #[tokio::test]
2159    async fn test_tenant_id_stored() {
2160        let (pipeline, _db) = make_pipeline().await;
2161        let owner_id = Uuid::new_v4();
2162        let tenant_id = Uuid::new_v4();
2163
2164        let result = pipeline
2165            .add(
2166                vec![DataInput::Text("tenant content".to_string())],
2167                "ds",
2168                owner_id,
2169                Some(tenant_id),
2170            )
2171            .await
2172            .unwrap();
2173
2174        assert_eq!(result[0].tenant_id, Some(tenant_id));
2175    }
2176
2177    #[tokio::test]
2178    async fn test_data_item_label_stored() {
2179        let (pipeline, _db) = make_pipeline().await;
2180        let owner_id = Uuid::new_v4();
2181
2182        let result = pipeline
2183            .add(
2184                vec![DataInput::DataItem {
2185                    data: Box::new(DataInput::Text("labelled content".to_string())),
2186                    label: "my-label".to_string(),
2187                    external_metadata: None,
2188                }],
2189                "ds",
2190                owner_id,
2191                None,
2192            )
2193            .await
2194            .unwrap();
2195
2196        assert_eq!(result.len(), 1);
2197        assert_eq!(
2198            result[0].label.as_deref(),
2199            Some("my-label"),
2200            "DataItem label must be stored in the Data record"
2201        );
2202    }
2203
2204    // ── extract_name — Python parity ────────────────────────────────────
2205
2206    #[test]
2207    fn extract_name_file_path_uses_stem_not_full_name() {
2208        // Python uses Path(file_path).stem which strips the extension.
2209        let input = DataInput::FilePath("documents/report.txt".into());
2210        let name = super::extract_name(&input, "abc123");
2211        assert_eq!(
2212            name, "report",
2213            "file path name should be stem (no extension)"
2214        );
2215    }
2216
2217    #[test]
2218    fn extract_name_file_path_with_file_uri() {
2219        let input = DataInput::FilePath("file:///tmp/data/notes.pdf".into());
2220        let name = super::extract_name(&input, "abc123");
2221        assert_eq!(name, "notes");
2222    }
2223
2224    #[test]
2225    fn extract_name_text_input_uses_hash() {
2226        let input = DataInput::Text("hello world".into());
2227        let name = super::extract_name(&input, "5eb63bbbe01eeed093cb22bb8f5acdc3");
2228        assert_eq!(name, "text_5eb63bbbe01eeed093cb22bb8f5acdc3");
2229    }
2230
2231    // ── mime type override for text-loader extensions ──────────────────
2232
2233    #[test]
2234    fn binary_md_file_gets_text_plain_mime() {
2235        let input = DataInput::Binary {
2236            name: "notes.md".to_string(),
2237            data: b"# Heading\nSome markdown".to_vec(),
2238        };
2239        let (_name, _ext, mime, _label) = super::extract_file_metadata(&input);
2240        assert_eq!(
2241            mime, "text/plain",
2242            ".md binary should produce text/plain, not text/markdown"
2243        );
2244    }
2245
2246    #[test]
2247    fn file_path_md_gets_text_plain_mime() {
2248        let input = DataInput::FilePath("/tmp/notes.md".to_string());
2249        let (_name, _ext, mime, _label) = super::extract_file_metadata(&input);
2250        assert_eq!(
2251            mime, "text/plain",
2252            ".md file path should produce text/plain, not text/markdown"
2253        );
2254    }
2255
2256    #[test]
2257    fn file_path_json_gets_text_plain_mime() {
2258        let input = DataInput::FilePath("/tmp/data.json".to_string());
2259        let (_name, _ext, mime, _label) = super::extract_file_metadata(&input);
2260        assert_eq!(
2261            mime, "text/plain",
2262            ".json file path should produce text/plain, not application/json"
2263        );
2264    }
2265
2266    #[test]
2267    fn file_path_pdf_keeps_original_mime() {
2268        let input = DataInput::FilePath("/tmp/doc.pdf".to_string());
2269        let (_name, _ext, mime, _label) = super::extract_file_metadata(&input);
2270        assert_ne!(
2271            mime, "text/plain",
2272            ".pdf should NOT be overridden to text/plain"
2273        );
2274    }
2275
2276    // ── external_metadata plumbing ────────────────────────────────────────
2277
2278    #[tokio::test]
2279    async fn test_data_item_external_metadata_stored() {
2280        let (pipeline, _db) = make_pipeline().await;
2281        let owner_id = Uuid::new_v4();
2282
2283        let meta_json = r#"{"source":"dlt","table_name":"orders"}"#.to_string();
2284        let result = pipeline
2285            .add(
2286                vec![DataInput::DataItem {
2287                    data: Box::new(DataInput::Text("dlt content".to_string())),
2288                    label: "dlt-label".to_string(),
2289                    external_metadata: Some(meta_json.clone()),
2290                }],
2291                "ds",
2292                owner_id,
2293                None,
2294            )
2295            .await
2296            .unwrap();
2297
2298        assert_eq!(result.len(), 1);
2299        assert_eq!(
2300            result[0].external_metadata.as_deref(),
2301            Some(meta_json.as_str()),
2302            "DataItem external_metadata must be stored in the Data record"
2303        );
2304    }
2305
2306    #[tokio::test]
2307    async fn test_data_item_without_metadata() {
2308        let (pipeline, _db) = make_pipeline().await;
2309        let owner_id = Uuid::new_v4();
2310
2311        let result = pipeline
2312            .add(
2313                vec![DataInput::DataItem {
2314                    data: Box::new(DataInput::Text("no metadata".to_string())),
2315                    label: "plain-label".to_string(),
2316                    external_metadata: None,
2317                }],
2318                "ds",
2319                owner_id,
2320                None,
2321            )
2322            .await
2323            .unwrap();
2324
2325        assert_eq!(result.len(), 1);
2326        assert_eq!(
2327            result[0].external_metadata, None,
2328            "DataItem with no external_metadata should produce None on Data"
2329        );
2330    }
2331}