Skip to main content

cognee_lib/api/
remember.rs

1//! One-call add + cognify + optional improve -- `remember()`.
2//!
3//! Composition of `add()` -> `cognify()` -> optionally `improve()` (via `memify`),
4//! with session-mode support.
5//!
6//! Equivalent to Python's `cognee.api.v1.remember.remember()`.
7
8use std::fmt;
9use std::sync::Arc;
10use std::time::Instant;
11
12use cognee_cognify::cognify;
13use cognee_cognify::{CognifyConfig, CognifyResult, MemifyConfig, MemifyResult, run_memify};
14use cognee_database::{
15    CheckpointStore, DatabaseConnection, PipelineRunRepository, SeaOrmPipelineRunRepository,
16    SessionLifecycleDb,
17};
18use cognee_embedding::EmbeddingEngine;
19use cognee_graph::GraphDBTrait;
20use cognee_ingestion::AddPipeline;
21use cognee_llm::Llm;
22use cognee_models::{DataInput, FeedbackEntry, MemoryEntry, QAEntry, TraceEntry};
23use cognee_ontology::OntologyResolver;
24use cognee_session::{SessionManager, SessionQAUpdate, SessionStore};
25use cognee_storage::StorageTrait;
26use cognee_vector::VectorDB;
27use serde::{Deserialize, Serialize};
28use tracing::{debug, info, warn};
29use uuid::Uuid;
30
31use super::error::ApiError;
32use super::improve::improve;
33
34/// Status of a remember operation.
35///
36/// **Decision 15** — library layer emits CamelCase `"PipelineRunStarted"`/
37/// `"PipelineRunCompleted"`/`"PipelineRunErrored"`/`"SessionStored"` so the
38/// in-process Rust SDK shares one status vocabulary with
39/// `cognee_core::PipelineRunStatus`. The HTTP layer (E-01) translates this
40/// to Python's lowercase wire format at the DTO boundary for strict Python
41/// wire parity.
42#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
43pub enum RememberStatus {
44    /// Pipeline has been initiated/started but has not yet finished.
45    ///
46    /// Currently unused by the synchronous SDK [`remember`] (which always
47    /// returns a terminal state). Exists for symmetry with
48    /// [`cognee_core::pipeline::PipelineRunStatus`] and for future async /
49    /// HTTP background-mode emission.
50    #[serde(rename = "PipelineRunStarted")]
51    Started,
52    /// Pipeline finished successfully.
53    #[serde(rename = "PipelineRunCompleted")]
54    Completed,
55    /// Pipeline finished with an error.
56    #[serde(rename = "PipelineRunErrored")]
57    Errored,
58    /// Session-mode only: data was stored in the session cache.
59    #[serde(rename = "SessionStored")]
60    SessionStored,
61}
62
63impl From<cognee_core::pipeline::PipelineRunStatus> for RememberStatus {
64    fn from(s: cognee_core::pipeline::PipelineRunStatus) -> Self {
65        use cognee_core::pipeline::PipelineRunStatus;
66        match s {
67            PipelineRunStatus::Initiated | PipelineRunStatus::Started => Self::Started,
68            PipelineRunStatus::Completed => Self::Completed,
69            PipelineRunStatus::Errored => Self::Errored,
70        }
71    }
72}
73
74/// Per-item information in the remember result.
75#[derive(Debug, Clone, Serialize, Deserialize)]
76pub struct RememberItemInfo {
77    pub id: Option<Uuid>,
78    pub name: Option<String>,
79    pub content_hash: Option<String>,
80    /// Token count (None when not yet computed).
81    pub token_count: Option<i64>,
82    /// Size of the raw data in bytes (None when unknown).
83    pub data_size: Option<i64>,
84    pub mime_type: Option<String>,
85}
86
87/// Result of a `remember()` call.
88///
89/// All fields are populated before the function returns — `remember()` is
90/// strictly synchronous.
91#[derive(Debug, Clone, Serialize)]
92pub struct RememberResult {
93    pub status: RememberStatus,
94    pub dataset_name: String,
95    pub dataset_id: Option<Uuid>,
96    pub session_ids: Option<Vec<String>>,
97    pub pipeline_run_id: Option<Uuid>,
98    /// Wall-clock seconds the operation took. `None` when the operation has
99    /// not produced a duration (Python parity:
100    /// `RememberResult.elapsed_seconds: Optional[float]`).
101    pub elapsed_seconds: Option<f64>,
102    /// Content hash of the first item (Python parity for deduplication tracking).
103    pub content_hash: Option<String>,
104    pub items_processed: usize,
105    pub items: Vec<RememberItemInfo>,
106    pub error: Option<String>,
107    /// Type discriminator for typed-entry remember (`"qa"` / `"trace"` /
108    /// `"feedback"`). Populated by `remember_entry()` (LIB-01) in the
109    /// typed-entry path; `None` for the file/text path.
110    pub entry_type: Option<String>,
111    /// Typed-entry id from `SessionManager`. Populated alongside
112    /// [`Self::entry_type`].
113    pub entry_id: Option<String>,
114    #[serde(skip)]
115    pub cognify_result: Option<CognifyResult>,
116    #[serde(skip)]
117    pub memify_result: Option<MemifyResult>,
118}
119
120impl RememberResult {
121    /// Serialize to a plain JSON value (Python `to_dict()` parity).
122    pub fn to_dict(&self) -> serde_json::Value {
123        serde_json::to_value(self).unwrap_or(serde_json::Value::Null)
124    }
125
126    /// `true` if status is `Completed` or `SessionStored` (Python `__bool__`).
127    pub fn is_success(&self) -> bool {
128        matches!(
129            self.status,
130            RememberStatus::Completed | RememberStatus::SessionStored
131        )
132    }
133
134    /// `true` always — every `RememberStatus` variant is terminal.
135    ///
136    /// `remember()` is synchronous; the result is always in a terminal state
137    /// by the time the function returns.
138    pub fn done(&self) -> bool {
139        true
140    }
141}
142
143impl fmt::Display for RememberResult {
144    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
145        write!(
146            f,
147            "RememberResult(status={:?}, dataset={:?}",
148            self.status, self.dataset_name
149        )?;
150        if let Some(ref ids) = self.session_ids {
151            if ids.len() == 1 {
152                write!(f, ", session_id={:?}", ids[0])?;
153            } else {
154                write!(f, ", session_ids={ids:?}")?;
155            }
156        }
157        if let Some(id) = self.dataset_id {
158            write!(f, ", dataset_id={id}")?;
159        }
160        if let Some(id) = self.pipeline_run_id {
161            write!(f, ", pipeline_run_id={id}")?;
162        }
163        if self.items_processed > 0 {
164            write!(f, ", items={}", self.items_processed)?;
165        }
166        if let Some(ref h) = self.content_hash {
167            write!(f, ", content_hash={h:?}")?;
168        }
169        if let Some(elapsed) = self.elapsed_seconds {
170            write!(f, ", elapsed={elapsed:.1}s")?;
171        }
172        if let Some(ref e) = self.error {
173            write!(f, ", error={e:?}")?;
174        }
175        write!(f, ")")
176    }
177}
178
179// ---------------------------------------------------------------------------
180// Public entry point
181// ---------------------------------------------------------------------------
182
183/// One-call add + cognify + optional improve.
184///
185/// **Permanent Memory Mode** (no `session_id`):
186/// 1. `add()` to ingest data.
187/// 2. `cognify()` to extract knowledge graph.
188/// 3. If `self_improvement=true`, `memify()` to enrich with triplet embeddings.
189///
190/// **Session Memory Mode** (with `session_id`):
191/// 1. Convert data inputs to text.
192/// 2. Store in session cache as Q&A entry.
193/// 3. If `self_improvement=true`, run `improve(session_ids=[session_id])`
194///    inline. Failures are logged but never surface as an error to the caller
195///    (matches Python `_session_improve()` semantics).
196///
197/// This function is strictly synchronous — it always returns a
198/// fully-populated [`RememberResult`]. Background dispatch is a host-side
199/// concern (e.g. the HTTP server via `PipelineRunRegistry::register_background`).
200#[allow(clippy::too_many_arguments)]
201pub async fn remember(
202    data: Vec<DataInput>,
203    dataset_name: &str,
204    session_id: Option<&str>,
205    self_improvement: bool,
206    owner_id: Uuid,
207    tenant_id: Option<Uuid>,
208    add_pipeline: Arc<AddPipeline>,
209    llm: Arc<dyn Llm>,
210    storage: Arc<dyn StorageTrait>,
211    graph_db: Arc<dyn GraphDBTrait>,
212    vector_db: Arc<dyn VectorDB>,
213    embedding_engine: Arc<dyn EmbeddingEngine>,
214    db: Option<Arc<DatabaseConnection>>,
215    session_store: Option<Arc<dyn SessionStore>>,
216    session_manager: Option<Arc<SessionManager>>,
217    checkpoint_store: Option<Arc<dyn CheckpointStore>>,
218    ontology_resolver: Arc<dyn OntologyResolver>,
219    cognify_config: Arc<CognifyConfig>,
220) -> Result<RememberResult, ApiError> {
221    let start = Instant::now();
222
223    // Mirrors Python `send_telemetry("cognee.remember", ...)` from
224    // cognee/api/v1/remember/remember.py:624.
225    #[cfg(feature = "telemetry")]
226    {
227        let data_size_bytes: usize = data
228            .iter()
229            .map(|d| match d {
230                DataInput::Text(s) => s.len(),
231                _ => 0,
232            })
233            .sum();
234        let item_count = data.len();
235        let mode = if session_id.is_some() {
236            "session"
237        } else {
238            "permanent"
239        };
240        cognee_telemetry::send_telemetry(
241            "cognee.remember",
242            owner_id,
243            Some(serde_json::json!({
244                "mode": mode,
245                "data_size_bytes": data_size_bytes,
246                "item_count": item_count,
247                "session_id": session_id,
248            })),
249        );
250    }
251
252    // -- Session Memory Mode --
253    if let Some(sid) = session_id {
254        return remember_session(
255            &data,
256            dataset_name,
257            sid,
258            self_improvement,
259            owner_id,
260            tenant_id,
261            add_pipeline,
262            llm,
263            storage,
264            graph_db,
265            vector_db,
266            embedding_engine,
267            db,
268            session_store,
269            session_manager,
270            checkpoint_store,
271            ontology_resolver,
272            cognify_config,
273            start,
274        )
275        .await;
276    }
277
278    // -- Permanent Memory Mode --
279    remember_permanent_blocking(
280        data,
281        dataset_name,
282        self_improvement,
283        owner_id,
284        tenant_id,
285        &add_pipeline,
286        llm,
287        storage,
288        graph_db,
289        vector_db,
290        embedding_engine,
291        db,
292        ontology_resolver,
293        &cognify_config,
294        start,
295    )
296    .await
297}
298
299// ---------------------------------------------------------------------------
300// Permanent mode: blocking
301// ---------------------------------------------------------------------------
302
303/// Outcome of the permanent-mode pipeline.
304struct PermanentOutcome {
305    dataset_id: Uuid,
306    pipeline_run_id: Uuid,
307    items: Vec<RememberItemInfo>,
308    items_processed: usize,
309    content_hash: Option<String>,
310    cognify_result: CognifyResult,
311    memify_result: Option<MemifyResult>,
312}
313
314#[allow(clippy::too_many_arguments)]
315async fn run_permanent_inner(
316    data: Vec<DataInput>,
317    dataset_name: &str,
318    self_improvement: bool,
319    owner_id: Uuid,
320    tenant_id: Option<Uuid>,
321    add_pipeline: &AddPipeline,
322    llm: Arc<dyn Llm>,
323    storage: Arc<dyn StorageTrait>,
324    graph_db: Arc<dyn GraphDBTrait>,
325    vector_db: Arc<dyn VectorDB>,
326    embedding_engine: Arc<dyn EmbeddingEngine>,
327    db: Option<Arc<DatabaseConnection>>,
328    ontology_resolver: Arc<dyn OntologyResolver>,
329    cognify_config: &CognifyConfig,
330) -> Result<PermanentOutcome, ApiError> {
331    let data_items = add_pipeline
332        .add(data, dataset_name, owner_id, tenant_id)
333        .await
334        .map_err(|e| ApiError::Ingestion(e.to_string()))?;
335
336    let items: Vec<RememberItemInfo> = data_items
337        .iter()
338        .map(|d| RememberItemInfo {
339            id: Some(d.id),
340            name: Some(d.name.clone()),
341            content_hash: Some(d.content_hash.clone()),
342            token_count: (d.token_count >= 0).then_some(d.token_count),
343            data_size: (d.data_size >= 0).then_some(d.data_size),
344            mime_type: Some(d.mime_type.clone()),
345        })
346        .collect();
347
348    let content_hash_first = items.first().and_then(|i| i.content_hash.clone());
349    let items_processed = items.len();
350
351    let dataset_id = cognee_ingestion::generate_dataset_id(dataset_name, owner_id, tenant_id);
352    // The Rust cognify pipeline does not expose a pipeline run ID today;
353    // synthesize one per-call to preserve Python API parity.
354    let pipeline_run_id = Uuid::new_v4();
355
356    // OSS build has no DB-backed user lookup (the `users` table is owned by
357    // the closed cloud build), so we always fall back to `None`. `cognify()`
358    // then uses `user_id.to_string()` as the provenance stamp, matching
359    // Python's unauthenticated-run behaviour.
360    let user_email: Option<String> = None;
361
362    // Clone the optional DB handle so memify (which now requires it per
363    // LIB-06 Decision 1) can still reach the relational connection after
364    // cognify consumes its copy.
365    let db_for_memify = db.clone();
366
367    // LIB-06-03: `cognify()` now requires `Arc<DatabaseConnection>` and an
368    // `Arc<dyn CpuPool>` (Decision 1).
369    let database = db
370        .clone()
371        .ok_or_else(|| ApiError::Cognify("cognify requires a DatabaseConnection".to_string()))?;
372    let thread_pool: Arc<dyn cognee_core::CpuPool> = Arc::new(
373        cognee_core::RayonThreadPool::with_default_threads()
374            .map_err(|e| ApiError::Cognify(format!("failed to construct thread pool: {e}")))?,
375    );
376
377    // Cognify.
378    // Gap 08-07: persist the four-state `pipeline_runs` trail through the
379    // real SeaORM repo when a database is available; embedded callers fall
380    // back to the no-op repo.
381    let pipeline_run_repo: Arc<dyn PipelineRunRepository> =
382        Arc::new(SeaOrmPipelineRunRepository::new(Arc::clone(&database)));
383    let cognify_result = cognify(
384        data_items,
385        dataset_id,
386        Some(owner_id),
387        user_email,
388        tenant_id,
389        llm,
390        storage,
391        Arc::clone(&graph_db),
392        Arc::clone(&vector_db),
393        Arc::clone(&embedding_engine),
394        database,
395        Arc::clone(&pipeline_run_repo),
396        thread_pool,
397        ontology_resolver,
398        cognify_config,
399    )
400    .await
401    .map_err(|e| ApiError::Cognify(e.to_string()))?;
402
403    // Optional self-improvement via memify.
404    let memify_result = if self_improvement {
405        let config = MemifyConfig::default();
406        match db_for_memify {
407            Some(database) => match cognee_core::RayonThreadPool::with_default_threads() {
408                Ok(pool) => {
409                    let thread_pool: Arc<dyn cognee_core::CpuPool> = Arc::new(pool);
410                    let pipeline_run_repo: Arc<dyn PipelineRunRepository> =
411                        Arc::new(SeaOrmPipelineRunRepository::new(Arc::clone(&database)));
412                    match run_memify(
413                        Arc::clone(&graph_db),
414                        Arc::clone(&vector_db),
415                        Arc::clone(&embedding_engine),
416                        thread_pool,
417                        database,
418                        pipeline_run_repo,
419                        Some(dataset_id),
420                        Some(owner_id),
421                        tenant_id,
422                        &config,
423                    )
424                    .await
425                    {
426                        Ok(r) => Some(r),
427                        Err(e) => {
428                            warn!("memify phase failed (non-fatal): {e}");
429                            None
430                        }
431                    }
432                }
433                Err(e) => {
434                    warn!("memify phase skipped (non-fatal): rayon pool init: {e}");
435                    None
436                }
437            },
438            None => {
439                warn!(
440                    "memify phase skipped: a relational database connection is required by the \
441                     LIB-06 executor-routed memify"
442                );
443                None
444            }
445        }
446    } else {
447        None
448    };
449
450    Ok(PermanentOutcome {
451        dataset_id,
452        pipeline_run_id,
453        items,
454        items_processed,
455        content_hash: content_hash_first,
456        cognify_result,
457        memify_result,
458    })
459}
460
461#[allow(clippy::too_many_arguments)]
462async fn remember_permanent_blocking(
463    data: Vec<DataInput>,
464    dataset_name: &str,
465    self_improvement: bool,
466    owner_id: Uuid,
467    tenant_id: Option<Uuid>,
468    add_pipeline: &AddPipeline,
469    llm: Arc<dyn Llm>,
470    storage: Arc<dyn StorageTrait>,
471    graph_db: Arc<dyn GraphDBTrait>,
472    vector_db: Arc<dyn VectorDB>,
473    embedding_engine: Arc<dyn EmbeddingEngine>,
474    db: Option<Arc<DatabaseConnection>>,
475    ontology_resolver: Arc<dyn OntologyResolver>,
476    cognify_config: &CognifyConfig,
477    start: Instant,
478) -> Result<RememberResult, ApiError> {
479    let outcome = run_permanent_inner(
480        data,
481        dataset_name,
482        self_improvement,
483        owner_id,
484        tenant_id,
485        add_pipeline,
486        llm,
487        storage,
488        graph_db,
489        vector_db,
490        embedding_engine,
491        db,
492        ontology_resolver,
493        cognify_config,
494    )
495    .await?;
496
497    let elapsed = start.elapsed().as_secs_f64();
498
499    Ok(RememberResult {
500        status: RememberStatus::Completed,
501        dataset_name: dataset_name.to_string(),
502        dataset_id: Some(outcome.dataset_id),
503        session_ids: None,
504        pipeline_run_id: Some(outcome.pipeline_run_id),
505        elapsed_seconds: Some(elapsed),
506        content_hash: outcome.content_hash,
507        items_processed: outcome.items_processed,
508        items: outcome.items,
509        error: None,
510        entry_type: None,
511        entry_id: None,
512        cognify_result: Some(outcome.cognify_result),
513        memify_result: outcome.memify_result,
514    })
515}
516
517// ---------------------------------------------------------------------------
518// Session mode
519// ---------------------------------------------------------------------------
520
521/// Session-mode remember: store data as Q&A text in the session cache.
522///
523/// When `self_improvement=true`, runs `improve()` inline (synchronously).
524/// Session-improve failures are logged but never propagated — matches Python's
525/// `_session_improve()` semantics.
526#[allow(clippy::too_many_arguments)]
527async fn remember_session(
528    data: &[DataInput],
529    dataset_name: &str,
530    session_id: &str,
531    self_improvement: bool,
532    owner_id: Uuid,
533    tenant_id: Option<Uuid>,
534    add_pipeline: Arc<AddPipeline>,
535    llm: Arc<dyn Llm>,
536    storage: Arc<dyn StorageTrait>,
537    graph_db: Arc<dyn GraphDBTrait>,
538    vector_db: Arc<dyn VectorDB>,
539    embedding_engine: Arc<dyn EmbeddingEngine>,
540    db: Option<Arc<DatabaseConnection>>,
541    session_store: Option<Arc<dyn SessionStore>>,
542    session_manager: Option<Arc<SessionManager>>,
543    checkpoint_store: Option<Arc<dyn CheckpointStore>>,
544    ontology_resolver: Arc<dyn OntologyResolver>,
545    cognify_config: Arc<CognifyConfig>,
546    start: Instant,
547) -> Result<RememberResult, ApiError> {
548    let store = session_store.clone().ok_or_else(|| {
549        ApiError::InvalidArgument(
550            "session_id provided but no session_store is available".to_string(),
551        )
552    })?;
553
554    // Convert data inputs to text representation.
555    let texts: Vec<String> = data
556        .iter()
557        .map(|di| match di {
558            DataInput::Text(t) => t.clone(),
559            DataInput::FilePath(p) => format!("[file: {p}]"),
560            other => format!("{other:?}"),
561        })
562        .collect();
563
564    let combined_text = texts.join("\n\n");
565    let user_id_str = owner_id.to_string();
566
567    // Store as a Q&A entry (question="" since this is ingestion, not a query).
568    store
569        .create_qa_entry(session_id, Some(&user_id_str), "", &combined_text, None)
570        .await?;
571
572    info!(
573        session_id = session_id,
574        text_len = combined_text.len(),
575        "remember: stored data in session cache"
576    );
577
578    // Optional self-improvement via improve() — inline (synchronous).
579    let mut improve_error: Option<String> = None;
580    if self_improvement {
581        let improve_result = improve(crate::api::improve::ImproveParams {
582            dataset_name: dataset_name.to_string(),
583            session_ids: Some(vec![session_id.to_string()]),
584            node_name: None,
585            owner_id,
586            tenant_id,
587            feedback_alpha: 0.1, // default feedback_alpha
588            llm,
589            storage,
590            graph_db,
591            vector_db,
592            embedding_engine,
593            ontology_resolver,
594            db,
595            session_store,
596            session_manager,
597            add_pipeline: Some(add_pipeline.as_ref()),
598            checkpoint_store,
599            cognify_config: &cognify_config,
600            // E-05: v2 power-user fields not exercised by remember()'s
601            // internal session-improve path.
602            extraction_tasks: None,
603            enrichment_tasks: None,
604            data: None,
605            // Global context index and background mode are not used by the
606            // inline remember() improve path.
607            build_global_context_index: false,
608            run_in_background: false,
609        })
610        .await;
611
612        match improve_result {
613            Ok(_) => {
614                info!(
615                    session_id = session_id,
616                    "remember: session bridged to permanent graph"
617                );
618            }
619            Err(e) => {
620                // Session-improve failures are non-fatal — record and log.
621                let msg = e.to_string();
622                warn!(
623                    session_id = session_id,
624                    "remember: session improve failed (non-fatal): {msg}"
625                );
626                improve_error = Some(msg);
627            }
628        }
629    }
630
631    let elapsed = start.elapsed().as_secs_f64();
632
633    Ok(RememberResult {
634        status: RememberStatus::SessionStored,
635        dataset_name: dataset_name.to_string(),
636        dataset_id: None,
637        session_ids: Some(vec![session_id.to_string()]),
638        pipeline_run_id: None,
639        elapsed_seconds: Some(elapsed),
640        content_hash: None,
641        items_processed: data.len(),
642        items: vec![],
643        error: improve_error,
644        entry_type: None,
645        entry_id: None,
646        cognify_result: None,
647        memify_result: None,
648    })
649}
650
651// ---------------------------------------------------------------------------
652// Typed-entry dispatch (`remember_entry`) — LIB-01 / Decision 2 / Decision 5
653// ---------------------------------------------------------------------------
654
655/// Dispatch a typed [`MemoryEntry`] to the appropriate `SessionManager`
656/// method.
657///
658/// Mirrors Python's `_dispatch_session_entry` at
659/// `cognee/api/v1/remember/remember.py:190-313`. The `entry_type` /
660/// `entry_id` fields on the returned [`RememberResult`] are populated for
661/// **all three** branches; the HTTP DTO at `crates/http-server/src/dto/`
662/// (E-02) carries them through to the wire.
663///
664/// **Behavior**:
665/// - Empty `session_id` returns `Err(ApiError::InvalidArgument)` (Python
666///   parity: `ValueError` → HTTP 400 at the handler boundary).
667/// - Best-effort pre-upsert via [`SessionLifecycleDb::ensure_and_touch_session`];
668///   any failure is logged at `debug` level and swallowed (Python parity:
669///   `try/except` around the pre-upsert at `remember.py:232-253`).
670/// - `MemoryEntry::Qa` → [`SessionManager::save_qa`]; if any of
671///   `feedback_text` / `feedback_score` / `used_graph_element_ids` is set,
672///   a follow-up [`SessionManager::update_qa`] applies the partial update.
673///   `entry_type = "qa"`, `entry_id = qa_id`.
674/// - `MemoryEntry::Trace` → [`SessionManager::add_agent_trace_step`].
675///   `method_params.unwrap_or(Value::Null)` is passed because the Rust
676///   signature requires a non-`Option` value. Feedback generation happens
677///   inside `SessionManager` based on `generate_feedback_with_llm` and
678///   whether an LLM is wired. `entry_type = "trace"`, `entry_id = trace_id`.
679/// - `MemoryEntry::Feedback` → [`SessionManager::add_feedback`]. On
680///   `Ok(true)` the result reports `RememberStatus::SessionStored` with
681///   `entry_id = qa_id`. On `Ok(false)` the result reports
682///   `RememberStatus::Errored` with `error = Some("add_feedback: QA <id>
683///   not found in session <sid>")`. `entry_type = "feedback"`.
684///
685/// **LLM feedback**: When `TraceEntry::generate_feedback_with_llm` is `true`
686/// and `llm` is `Some`, the trace entry's `session_feedback` is produced by
687/// calling the LLM with the
688/// `agent_trace_feedback_summary_system` prompt (Python parity); the call is
689/// bounded by an 8-second timeout. On timeout, LLM error, missing `llm`
690/// handle, or empty `method_return_value`, the implementation falls back to
691/// the deterministic Python-parity strings (`"<origin> succeeded."` /
692/// `"<origin> failed. Reason: ..."` / `"<origin> failed."`). When
693/// `generate_feedback_with_llm` is `false`, the deterministic fallback is
694/// recorded regardless — also Python parity
695/// (`session_manager.py:289-294`).
696#[allow(clippy::too_many_arguments)]
697pub async fn remember_entry(
698    entry: MemoryEntry,
699    dataset_name: &str,
700    session_id: &str,
701    owner_id: Uuid,
702    _tenant_id: Option<Uuid>,
703    db: Option<Arc<DatabaseConnection>>,
704    _session_store: Option<Arc<dyn SessionStore>>,
705    session_manager: Option<Arc<SessionManager>>,
706    llm: Option<Arc<dyn Llm>>,
707) -> Result<RememberResult, ApiError> {
708    let start = Instant::now();
709
710    if session_id.is_empty() {
711        return Err(ApiError::InvalidArgument(
712            "session_id is required for typed memory entries".to_string(),
713        ));
714    }
715
716    let base_session_manager = session_manager.ok_or_else(|| {
717        ApiError::InvalidArgument("SessionManager is required for typed memory entries".to_string())
718    })?;
719    let sm = if let Some(llm) = llm.clone() {
720        Arc::new(base_session_manager.as_ref().clone().with_llm(llm))
721    } else {
722        base_session_manager
723    };
724
725    // Best-effort pre-upsert of the session_records row. Mirrors Python's
726    // try/except at remember.py:232-253 — any failure is logged at debug
727    // and we proceed without a dataset_id binding.
728    if let Some(ref database) = db
729        && let Err(exc) = SessionLifecycleDb::ensure_and_touch_session(
730            database.as_ref(),
731            session_id,
732            owner_id,
733            None,
734        )
735        .await
736    {
737        debug!(
738            session_id = session_id,
739            "remember_entry: pre-upsert session_record failed (non-fatal): {exc}"
740        );
741    }
742
743    let user_id_str = owner_id.to_string();
744    let entry_type_str = entry.type_str();
745
746    let mut status = RememberStatus::SessionStored;
747    let entry_id: Option<String>;
748    let mut error: Option<String> = None;
749
750    match entry {
751        MemoryEntry::Qa(q) => {
752            let QAEntry {
753                question,
754                answer,
755                context,
756                feedback_text,
757                feedback_score,
758                used_graph_element_ids,
759            } = q;
760
761            let qa_id = sm
762                .save_qa(
763                    Some(session_id),
764                    Some(&user_id_str),
765                    &question,
766                    &answer,
767                    Some(context.as_str()),
768                    // used_graph_element_ids handled by the follow-up update below
769                    // when the raw JSON value needs schema-validation first.
770                    None,
771                )
772                .await?;
773
774            // Follow-up partial update when any of the optional fields are
775            // present. Composes existing methods rather than widening
776            // `save_qa`'s public signature (see task §3 rationale).
777            if feedback_text.is_some()
778                || feedback_score.is_some()
779                || used_graph_element_ids.is_some()
780            {
781                let used_graph_element_ids_typed = match used_graph_element_ids {
782                    Some(value) => Some(Some(serde_json::from_value(value).map_err(|e| {
783                        ApiError::InvalidArgument(format!(
784                            "used_graph_element_ids does not match {{node_ids:[], edge_ids:[]}} shape: {e}"
785                        ))
786                    })?)),
787                    None => None,
788                };
789
790                let updates = SessionQAUpdate {
791                    feedback_text: feedback_text.map(Some),
792                    feedback_score: feedback_score.map(Some),
793                    used_graph_element_ids: used_graph_element_ids_typed,
794                    ..Default::default()
795                };
796
797                sm.update_qa(Some(session_id), Some(&user_id_str), &qa_id, updates)
798                    .await?;
799            }
800
801            entry_id = Some(qa_id);
802        }
803
804        MemoryEntry::Trace(t) => {
805            let TraceEntry {
806                origin_function,
807                status: trace_status,
808                method_params,
809                method_return_value,
810                memory_query,
811                memory_context,
812                error_message,
813                generate_feedback_with_llm,
814            } = t;
815
816            let trace_id = sm
817                .add_agent_trace_step(
818                    &user_id_str,
819                    Some(session_id),
820                    &origin_function,
821                    &trace_status,
822                    &memory_query,
823                    &memory_context,
824                    method_params.unwrap_or(serde_json::Value::Null),
825                    method_return_value,
826                    &error_message,
827                    generate_feedback_with_llm,
828                )
829                .await?;
830
831            entry_id = Some(trace_id);
832        }
833
834        MemoryEntry::Feedback(f) => {
835            let FeedbackEntry {
836                qa_id,
837                feedback_text,
838                feedback_score,
839            } = f;
840
841            let ok = sm
842                .add_feedback(
843                    Some(session_id),
844                    Some(&user_id_str),
845                    &qa_id,
846                    feedback_text.as_deref(),
847                    feedback_score,
848                )
849                .await?;
850
851            if !ok {
852                status = RememberStatus::Errored;
853                error = Some(format!(
854                    "add_feedback: QA {qa_id} not found in session {session_id}"
855                ));
856            }
857            // Python parity: entry_id is set to the input qa_id even on
858            // not-found (remember.py:307: `result.entry_id = entry.qa_id`).
859            entry_id = Some(qa_id);
860        }
861    }
862
863    info!(
864        session_id = session_id,
865        entry_type = entry_type_str,
866        entry_id = entry_id.as_deref().unwrap_or(""),
867        status = ?status,
868        "remember_entry: dispatched typed memory entry"
869    );
870
871    Ok(RememberResult {
872        status,
873        dataset_name: dataset_name.to_string(),
874        dataset_id: None,
875        session_ids: Some(vec![session_id.to_string()]),
876        pipeline_run_id: None,
877        elapsed_seconds: Some(start.elapsed().as_secs_f64()),
878        content_hash: None,
879        items_processed: 0,
880        items: vec![],
881        error,
882        entry_type: Some(entry_type_str.to_string()),
883        entry_id,
884        cognify_result: None,
885        memify_result: None,
886    })
887}
888
889// ---------------------------------------------------------------------------
890// Unit tests
891// ---------------------------------------------------------------------------
892
893#[cfg(test)]
894#[allow(
895    clippy::unwrap_used,
896    clippy::expect_used,
897    reason = "test code — panics are acceptable failures"
898)]
899mod tests {
900    use super::*;
901
902    #[test]
903    fn remember_status_serde_roundtrip_errored() {
904        let s = RememberStatus::Errored;
905        let j = serde_json::to_string(&s).expect("serialize");
906        assert_eq!(j, "\"PipelineRunErrored\"");
907        let back: RememberStatus = serde_json::from_str(&j).expect("deserialize");
908        assert_eq!(back, RememberStatus::Errored);
909    }
910
911    #[test]
912    fn remember_status_serializes_to_pipeline_run_camelcase() {
913        // Decision 15 (LIB-06): library status emits CamelCase
914        // "PipelineRun*" / "SessionStored" — matches the
915        // `cognee_core::PipelineRunStatus` family. HTTP-side translation
916        // (E-01) maps these to Python's lowercase wire format.
917        assert_eq!(
918            serde_json::to_string(&RememberStatus::Started).expect("ser"),
919            "\"PipelineRunStarted\""
920        );
921        assert_eq!(
922            serde_json::to_string(&RememberStatus::Completed).expect("ser"),
923            "\"PipelineRunCompleted\""
924        );
925        assert_eq!(
926            serde_json::to_string(&RememberStatus::Errored).expect("ser"),
927            "\"PipelineRunErrored\""
928        );
929        assert_eq!(
930            serde_json::to_string(&RememberStatus::SessionStored).expect("ser"),
931            "\"SessionStored\""
932        );
933    }
934
935    #[test]
936    fn remember_status_deserializes_from_pipeline_run_camelcase() {
937        for (s, expected) in [
938            ("\"PipelineRunStarted\"", RememberStatus::Started),
939            ("\"PipelineRunCompleted\"", RememberStatus::Completed),
940            ("\"PipelineRunErrored\"", RememberStatus::Errored),
941            ("\"SessionStored\"", RememberStatus::SessionStored),
942        ] {
943            let got: RememberStatus = serde_json::from_str(s).expect("deserialize");
944            assert_eq!(got, expected, "for input {s}");
945        }
946    }
947
948    #[test]
949    fn remember_status_from_pipeline_run_status_translation_table() {
950        use cognee_core::pipeline::PipelineRunStatus;
951        // Exhaustive match — adding a variant to PipelineRunStatus forces
952        // this test to be updated.
953        assert_eq!(
954            RememberStatus::from(PipelineRunStatus::Initiated),
955            RememberStatus::Started
956        );
957        assert_eq!(
958            RememberStatus::from(PipelineRunStatus::Started),
959            RememberStatus::Started
960        );
961        assert_eq!(
962            RememberStatus::from(PipelineRunStatus::Completed),
963            RememberStatus::Completed
964        );
965        assert_eq!(
966            RememberStatus::from(PipelineRunStatus::Errored),
967            RememberStatus::Errored
968        );
969    }
970
971    #[test]
972    fn remember_result_elapsed_seconds_serializes_as_null_when_none() {
973        let mut r = sample_result(RememberStatus::Completed);
974        r.elapsed_seconds = None;
975        let v = r.to_dict();
976        let obj = v.as_object().expect("object");
977        assert!(
978            obj.contains_key("elapsed_seconds"),
979            "elapsed_seconds key should be present even when None (Python parity)"
980        );
981        assert!(
982            obj.get("elapsed_seconds").is_some_and(|v| v.is_null()),
983            "elapsed_seconds should serialize as null when None"
984        );
985    }
986
987    #[test]
988    fn is_success_completed_and_session_stored() {
989        let mut r = sample_result(RememberStatus::Completed);
990        assert!(r.is_success());
991        assert!(r.done());
992
993        r.status = RememberStatus::SessionStored;
994        assert!(r.is_success());
995        assert!(r.done());
996    }
997
998    #[test]
999    fn is_success_errored() {
1000        let r = sample_result(RememberStatus::Errored);
1001        assert!(!r.is_success());
1002        // done() is always true — every status is terminal.
1003        assert!(r.done());
1004    }
1005
1006    #[test]
1007    fn all_statuses_are_done() {
1008        for status in [
1009            RememberStatus::Completed,
1010            RememberStatus::Errored,
1011            RememberStatus::SessionStored,
1012        ] {
1013            let r = sample_result(status);
1014            assert!(r.done(), "expected done() == true for {status:?}");
1015        }
1016    }
1017
1018    #[test]
1019    fn display_format_has_status_and_dataset() {
1020        let r = sample_result(RememberStatus::Completed);
1021        let text = format!("{r}");
1022        assert!(text.contains("RememberResult("));
1023        assert!(text.contains("status=Completed"));
1024        assert!(text.contains("dataset="));
1025        assert!(text.ends_with(')'));
1026    }
1027
1028    #[test]
1029    fn to_dict_omits_skipped_fields() {
1030        let r = sample_result(RememberStatus::Completed);
1031        let v = r.to_dict();
1032        assert!(v.is_object());
1033        let obj = v.as_object().expect("object");
1034        assert!(obj.contains_key("status"));
1035        assert!(obj.contains_key("dataset_name"));
1036        // cognify_result / memify_result are #[serde(skip)]
1037        assert!(!obj.contains_key("cognify_result"));
1038        assert!(!obj.contains_key("memify_result"));
1039    }
1040
1041    #[test]
1042    fn display_formats_single_session_id() {
1043        let mut r = sample_result(RememberStatus::SessionStored);
1044        r.session_ids = Some(vec!["sess-123".to_string()]);
1045        let text = format!("{r}");
1046        assert!(text.contains("session_id=\"sess-123\""));
1047        assert!(!text.contains("session_ids="));
1048    }
1049
1050    fn sample_result(status: RememberStatus) -> RememberResult {
1051        RememberResult {
1052            status,
1053            dataset_name: "main_dataset".to_string(),
1054            dataset_id: None,
1055            session_ids: None,
1056            pipeline_run_id: None,
1057            elapsed_seconds: Some(1.23),
1058            content_hash: None,
1059            items_processed: 0,
1060            items: Vec::new(),
1061            error: None,
1062            entry_type: None,
1063            entry_id: None,
1064            cognify_result: None,
1065            memify_result: None,
1066        }
1067    }
1068}