1use std::fmt;
9use std::sync::Arc;
10use std::time::Instant;
11
12use cognee_cognify::cognify;
13use cognee_cognify::{CognifyConfig, CognifyResult, MemifyConfig, MemifyResult, run_memify};
14use cognee_database::{
15 CheckpointStore, DatabaseConnection, PipelineRunRepository, SeaOrmPipelineRunRepository,
16 SessionLifecycleDb,
17};
18use cognee_embedding::EmbeddingEngine;
19use cognee_graph::GraphDBTrait;
20use cognee_ingestion::AddPipeline;
21use cognee_llm::Llm;
22use cognee_models::{DataInput, FeedbackEntry, MemoryEntry, QAEntry, TraceEntry};
23use cognee_ontology::OntologyResolver;
24use cognee_session::{SessionManager, SessionQAUpdate, SessionStore};
25use cognee_storage::StorageTrait;
26use cognee_vector::VectorDB;
27use serde::{Deserialize, Serialize};
28use tracing::{debug, info, warn};
29use uuid::Uuid;
30
31use super::error::ApiError;
32use super::improve::improve;
33
34#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
43pub enum RememberStatus {
44 #[serde(rename = "PipelineRunStarted")]
51 Started,
52 #[serde(rename = "PipelineRunCompleted")]
54 Completed,
55 #[serde(rename = "PipelineRunErrored")]
57 Errored,
58 #[serde(rename = "SessionStored")]
60 SessionStored,
61}
62
63impl From<cognee_core::pipeline::PipelineRunStatus> for RememberStatus {
64 fn from(s: cognee_core::pipeline::PipelineRunStatus) -> Self {
65 use cognee_core::pipeline::PipelineRunStatus;
66 match s {
67 PipelineRunStatus::Initiated | PipelineRunStatus::Started => Self::Started,
68 PipelineRunStatus::Completed => Self::Completed,
69 PipelineRunStatus::Errored => Self::Errored,
70 }
71 }
72}
73
74#[derive(Debug, Clone, Serialize, Deserialize)]
76pub struct RememberItemInfo {
77 pub id: Option<Uuid>,
78 pub name: Option<String>,
79 pub content_hash: Option<String>,
80 pub token_count: Option<i64>,
82 pub data_size: Option<i64>,
84 pub mime_type: Option<String>,
85}
86
87#[derive(Debug, Clone, Serialize)]
92pub struct RememberResult {
93 pub status: RememberStatus,
94 pub dataset_name: String,
95 pub dataset_id: Option<Uuid>,
96 pub session_ids: Option<Vec<String>>,
97 pub pipeline_run_id: Option<Uuid>,
98 pub elapsed_seconds: Option<f64>,
102 pub content_hash: Option<String>,
104 pub items_processed: usize,
105 pub items: Vec<RememberItemInfo>,
106 pub error: Option<String>,
107 pub entry_type: Option<String>,
111 pub entry_id: Option<String>,
114 #[serde(skip)]
115 pub cognify_result: Option<CognifyResult>,
116 #[serde(skip)]
117 pub memify_result: Option<MemifyResult>,
118}
119
120impl RememberResult {
121 pub fn to_dict(&self) -> serde_json::Value {
123 serde_json::to_value(self).unwrap_or(serde_json::Value::Null)
124 }
125
126 pub fn is_success(&self) -> bool {
128 matches!(
129 self.status,
130 RememberStatus::Completed | RememberStatus::SessionStored
131 )
132 }
133
134 pub fn done(&self) -> bool {
139 true
140 }
141}
142
143impl fmt::Display for RememberResult {
144 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
145 write!(
146 f,
147 "RememberResult(status={:?}, dataset={:?}",
148 self.status, self.dataset_name
149 )?;
150 if let Some(ref ids) = self.session_ids {
151 if ids.len() == 1 {
152 write!(f, ", session_id={:?}", ids[0])?;
153 } else {
154 write!(f, ", session_ids={ids:?}")?;
155 }
156 }
157 if let Some(id) = self.dataset_id {
158 write!(f, ", dataset_id={id}")?;
159 }
160 if let Some(id) = self.pipeline_run_id {
161 write!(f, ", pipeline_run_id={id}")?;
162 }
163 if self.items_processed > 0 {
164 write!(f, ", items={}", self.items_processed)?;
165 }
166 if let Some(ref h) = self.content_hash {
167 write!(f, ", content_hash={h:?}")?;
168 }
169 if let Some(elapsed) = self.elapsed_seconds {
170 write!(f, ", elapsed={elapsed:.1}s")?;
171 }
172 if let Some(ref e) = self.error {
173 write!(f, ", error={e:?}")?;
174 }
175 write!(f, ")")
176 }
177}
178
179#[allow(clippy::too_many_arguments)]
201pub async fn remember(
202 data: Vec<DataInput>,
203 dataset_name: &str,
204 session_id: Option<&str>,
205 self_improvement: bool,
206 owner_id: Uuid,
207 tenant_id: Option<Uuid>,
208 add_pipeline: Arc<AddPipeline>,
209 llm: Arc<dyn Llm>,
210 storage: Arc<dyn StorageTrait>,
211 graph_db: Arc<dyn GraphDBTrait>,
212 vector_db: Arc<dyn VectorDB>,
213 embedding_engine: Arc<dyn EmbeddingEngine>,
214 db: Option<Arc<DatabaseConnection>>,
215 session_store: Option<Arc<dyn SessionStore>>,
216 session_manager: Option<Arc<SessionManager>>,
217 checkpoint_store: Option<Arc<dyn CheckpointStore>>,
218 ontology_resolver: Arc<dyn OntologyResolver>,
219 cognify_config: Arc<CognifyConfig>,
220) -> Result<RememberResult, ApiError> {
221 let start = Instant::now();
222
223 #[cfg(feature = "telemetry")]
226 {
227 let data_size_bytes: usize = data
228 .iter()
229 .map(|d| match d {
230 DataInput::Text(s) => s.len(),
231 _ => 0,
232 })
233 .sum();
234 let item_count = data.len();
235 let mode = if session_id.is_some() {
236 "session"
237 } else {
238 "permanent"
239 };
240 cognee_telemetry::send_telemetry(
241 "cognee.remember",
242 owner_id,
243 Some(serde_json::json!({
244 "mode": mode,
245 "data_size_bytes": data_size_bytes,
246 "item_count": item_count,
247 "session_id": session_id,
248 })),
249 );
250 }
251
252 if let Some(sid) = session_id {
254 return remember_session(
255 &data,
256 dataset_name,
257 sid,
258 self_improvement,
259 owner_id,
260 tenant_id,
261 add_pipeline,
262 llm,
263 storage,
264 graph_db,
265 vector_db,
266 embedding_engine,
267 db,
268 session_store,
269 session_manager,
270 checkpoint_store,
271 ontology_resolver,
272 cognify_config,
273 start,
274 )
275 .await;
276 }
277
278 remember_permanent_blocking(
280 data,
281 dataset_name,
282 self_improvement,
283 owner_id,
284 tenant_id,
285 &add_pipeline,
286 llm,
287 storage,
288 graph_db,
289 vector_db,
290 embedding_engine,
291 db,
292 ontology_resolver,
293 &cognify_config,
294 start,
295 )
296 .await
297}
298
299struct PermanentOutcome {
305 dataset_id: Uuid,
306 pipeline_run_id: Uuid,
307 items: Vec<RememberItemInfo>,
308 items_processed: usize,
309 content_hash: Option<String>,
310 cognify_result: CognifyResult,
311 memify_result: Option<MemifyResult>,
312}
313
314#[allow(clippy::too_many_arguments)]
315async fn run_permanent_inner(
316 data: Vec<DataInput>,
317 dataset_name: &str,
318 self_improvement: bool,
319 owner_id: Uuid,
320 tenant_id: Option<Uuid>,
321 add_pipeline: &AddPipeline,
322 llm: Arc<dyn Llm>,
323 storage: Arc<dyn StorageTrait>,
324 graph_db: Arc<dyn GraphDBTrait>,
325 vector_db: Arc<dyn VectorDB>,
326 embedding_engine: Arc<dyn EmbeddingEngine>,
327 db: Option<Arc<DatabaseConnection>>,
328 ontology_resolver: Arc<dyn OntologyResolver>,
329 cognify_config: &CognifyConfig,
330) -> Result<PermanentOutcome, ApiError> {
331 let data_items = add_pipeline
332 .add(data, dataset_name, owner_id, tenant_id)
333 .await
334 .map_err(|e| ApiError::Ingestion(e.to_string()))?;
335
336 let items: Vec<RememberItemInfo> = data_items
337 .iter()
338 .map(|d| RememberItemInfo {
339 id: Some(d.id),
340 name: Some(d.name.clone()),
341 content_hash: Some(d.content_hash.clone()),
342 token_count: (d.token_count >= 0).then_some(d.token_count),
343 data_size: (d.data_size >= 0).then_some(d.data_size),
344 mime_type: Some(d.mime_type.clone()),
345 })
346 .collect();
347
348 let content_hash_first = items.first().and_then(|i| i.content_hash.clone());
349 let items_processed = items.len();
350
351 let dataset_id = cognee_ingestion::generate_dataset_id(dataset_name, owner_id, tenant_id);
352 let pipeline_run_id = Uuid::new_v4();
355
356 let user_email: Option<String> = None;
361
362 let db_for_memify = db.clone();
366
367 let database = db
370 .clone()
371 .ok_or_else(|| ApiError::Cognify("cognify requires a DatabaseConnection".to_string()))?;
372 let thread_pool: Arc<dyn cognee_core::CpuPool> = Arc::new(
373 cognee_core::RayonThreadPool::with_default_threads()
374 .map_err(|e| ApiError::Cognify(format!("failed to construct thread pool: {e}")))?,
375 );
376
377 let pipeline_run_repo: Arc<dyn PipelineRunRepository> =
382 Arc::new(SeaOrmPipelineRunRepository::new(Arc::clone(&database)));
383 let cognify_result = cognify(
384 data_items,
385 dataset_id,
386 Some(owner_id),
387 user_email,
388 tenant_id,
389 llm,
390 storage,
391 Arc::clone(&graph_db),
392 Arc::clone(&vector_db),
393 Arc::clone(&embedding_engine),
394 database,
395 Arc::clone(&pipeline_run_repo),
396 thread_pool,
397 ontology_resolver,
398 cognify_config,
399 )
400 .await
401 .map_err(|e| ApiError::Cognify(e.to_string()))?;
402
403 let memify_result = if self_improvement {
405 let config = MemifyConfig::default();
406 match db_for_memify {
407 Some(database) => match cognee_core::RayonThreadPool::with_default_threads() {
408 Ok(pool) => {
409 let thread_pool: Arc<dyn cognee_core::CpuPool> = Arc::new(pool);
410 let pipeline_run_repo: Arc<dyn PipelineRunRepository> =
411 Arc::new(SeaOrmPipelineRunRepository::new(Arc::clone(&database)));
412 match run_memify(
413 Arc::clone(&graph_db),
414 Arc::clone(&vector_db),
415 Arc::clone(&embedding_engine),
416 thread_pool,
417 database,
418 pipeline_run_repo,
419 Some(dataset_id),
420 Some(owner_id),
421 tenant_id,
422 &config,
423 )
424 .await
425 {
426 Ok(r) => Some(r),
427 Err(e) => {
428 warn!("memify phase failed (non-fatal): {e}");
429 None
430 }
431 }
432 }
433 Err(e) => {
434 warn!("memify phase skipped (non-fatal): rayon pool init: {e}");
435 None
436 }
437 },
438 None => {
439 warn!(
440 "memify phase skipped: a relational database connection is required by the \
441 LIB-06 executor-routed memify"
442 );
443 None
444 }
445 }
446 } else {
447 None
448 };
449
450 Ok(PermanentOutcome {
451 dataset_id,
452 pipeline_run_id,
453 items,
454 items_processed,
455 content_hash: content_hash_first,
456 cognify_result,
457 memify_result,
458 })
459}
460
461#[allow(clippy::too_many_arguments)]
462async fn remember_permanent_blocking(
463 data: Vec<DataInput>,
464 dataset_name: &str,
465 self_improvement: bool,
466 owner_id: Uuid,
467 tenant_id: Option<Uuid>,
468 add_pipeline: &AddPipeline,
469 llm: Arc<dyn Llm>,
470 storage: Arc<dyn StorageTrait>,
471 graph_db: Arc<dyn GraphDBTrait>,
472 vector_db: Arc<dyn VectorDB>,
473 embedding_engine: Arc<dyn EmbeddingEngine>,
474 db: Option<Arc<DatabaseConnection>>,
475 ontology_resolver: Arc<dyn OntologyResolver>,
476 cognify_config: &CognifyConfig,
477 start: Instant,
478) -> Result<RememberResult, ApiError> {
479 let outcome = run_permanent_inner(
480 data,
481 dataset_name,
482 self_improvement,
483 owner_id,
484 tenant_id,
485 add_pipeline,
486 llm,
487 storage,
488 graph_db,
489 vector_db,
490 embedding_engine,
491 db,
492 ontology_resolver,
493 cognify_config,
494 )
495 .await?;
496
497 let elapsed = start.elapsed().as_secs_f64();
498
499 Ok(RememberResult {
500 status: RememberStatus::Completed,
501 dataset_name: dataset_name.to_string(),
502 dataset_id: Some(outcome.dataset_id),
503 session_ids: None,
504 pipeline_run_id: Some(outcome.pipeline_run_id),
505 elapsed_seconds: Some(elapsed),
506 content_hash: outcome.content_hash,
507 items_processed: outcome.items_processed,
508 items: outcome.items,
509 error: None,
510 entry_type: None,
511 entry_id: None,
512 cognify_result: Some(outcome.cognify_result),
513 memify_result: outcome.memify_result,
514 })
515}
516
517#[allow(clippy::too_many_arguments)]
527async fn remember_session(
528 data: &[DataInput],
529 dataset_name: &str,
530 session_id: &str,
531 self_improvement: bool,
532 owner_id: Uuid,
533 tenant_id: Option<Uuid>,
534 add_pipeline: Arc<AddPipeline>,
535 llm: Arc<dyn Llm>,
536 storage: Arc<dyn StorageTrait>,
537 graph_db: Arc<dyn GraphDBTrait>,
538 vector_db: Arc<dyn VectorDB>,
539 embedding_engine: Arc<dyn EmbeddingEngine>,
540 db: Option<Arc<DatabaseConnection>>,
541 session_store: Option<Arc<dyn SessionStore>>,
542 session_manager: Option<Arc<SessionManager>>,
543 checkpoint_store: Option<Arc<dyn CheckpointStore>>,
544 ontology_resolver: Arc<dyn OntologyResolver>,
545 cognify_config: Arc<CognifyConfig>,
546 start: Instant,
547) -> Result<RememberResult, ApiError> {
548 let store = session_store.clone().ok_or_else(|| {
549 ApiError::InvalidArgument(
550 "session_id provided but no session_store is available".to_string(),
551 )
552 })?;
553
554 let texts: Vec<String> = data
556 .iter()
557 .map(|di| match di {
558 DataInput::Text(t) => t.clone(),
559 DataInput::FilePath(p) => format!("[file: {p}]"),
560 other => format!("{other:?}"),
561 })
562 .collect();
563
564 let combined_text = texts.join("\n\n");
565 let user_id_str = owner_id.to_string();
566
567 store
569 .create_qa_entry(session_id, Some(&user_id_str), "", &combined_text, None)
570 .await?;
571
572 info!(
573 session_id = session_id,
574 text_len = combined_text.len(),
575 "remember: stored data in session cache"
576 );
577
578 let mut improve_error: Option<String> = None;
580 if self_improvement {
581 let improve_result = improve(crate::api::improve::ImproveParams {
582 dataset_name: dataset_name.to_string(),
583 session_ids: Some(vec![session_id.to_string()]),
584 node_name: None,
585 owner_id,
586 tenant_id,
587 feedback_alpha: 0.1, llm,
589 storage,
590 graph_db,
591 vector_db,
592 embedding_engine,
593 ontology_resolver,
594 db,
595 session_store,
596 session_manager,
597 add_pipeline: Some(add_pipeline.as_ref()),
598 checkpoint_store,
599 cognify_config: &cognify_config,
600 extraction_tasks: None,
603 enrichment_tasks: None,
604 data: None,
605 build_global_context_index: false,
608 run_in_background: false,
609 })
610 .await;
611
612 match improve_result {
613 Ok(_) => {
614 info!(
615 session_id = session_id,
616 "remember: session bridged to permanent graph"
617 );
618 }
619 Err(e) => {
620 let msg = e.to_string();
622 warn!(
623 session_id = session_id,
624 "remember: session improve failed (non-fatal): {msg}"
625 );
626 improve_error = Some(msg);
627 }
628 }
629 }
630
631 let elapsed = start.elapsed().as_secs_f64();
632
633 Ok(RememberResult {
634 status: RememberStatus::SessionStored,
635 dataset_name: dataset_name.to_string(),
636 dataset_id: None,
637 session_ids: Some(vec![session_id.to_string()]),
638 pipeline_run_id: None,
639 elapsed_seconds: Some(elapsed),
640 content_hash: None,
641 items_processed: data.len(),
642 items: vec![],
643 error: improve_error,
644 entry_type: None,
645 entry_id: None,
646 cognify_result: None,
647 memify_result: None,
648 })
649}
650
651#[allow(clippy::too_many_arguments)]
697pub async fn remember_entry(
698 entry: MemoryEntry,
699 dataset_name: &str,
700 session_id: &str,
701 owner_id: Uuid,
702 _tenant_id: Option<Uuid>,
703 db: Option<Arc<DatabaseConnection>>,
704 _session_store: Option<Arc<dyn SessionStore>>,
705 session_manager: Option<Arc<SessionManager>>,
706 llm: Option<Arc<dyn Llm>>,
707) -> Result<RememberResult, ApiError> {
708 let start = Instant::now();
709
710 if session_id.is_empty() {
711 return Err(ApiError::InvalidArgument(
712 "session_id is required for typed memory entries".to_string(),
713 ));
714 }
715
716 let base_session_manager = session_manager.ok_or_else(|| {
717 ApiError::InvalidArgument("SessionManager is required for typed memory entries".to_string())
718 })?;
719 let sm = if let Some(llm) = llm.clone() {
720 Arc::new(base_session_manager.as_ref().clone().with_llm(llm))
721 } else {
722 base_session_manager
723 };
724
725 if let Some(ref database) = db
729 && let Err(exc) = SessionLifecycleDb::ensure_and_touch_session(
730 database.as_ref(),
731 session_id,
732 owner_id,
733 None,
734 )
735 .await
736 {
737 debug!(
738 session_id = session_id,
739 "remember_entry: pre-upsert session_record failed (non-fatal): {exc}"
740 );
741 }
742
743 let user_id_str = owner_id.to_string();
744 let entry_type_str = entry.type_str();
745
746 let mut status = RememberStatus::SessionStored;
747 let entry_id: Option<String>;
748 let mut error: Option<String> = None;
749
750 match entry {
751 MemoryEntry::Qa(q) => {
752 let QAEntry {
753 question,
754 answer,
755 context,
756 feedback_text,
757 feedback_score,
758 used_graph_element_ids,
759 } = q;
760
761 let qa_id = sm
762 .save_qa(
763 Some(session_id),
764 Some(&user_id_str),
765 &question,
766 &answer,
767 Some(context.as_str()),
768 None,
771 )
772 .await?;
773
774 if feedback_text.is_some()
778 || feedback_score.is_some()
779 || used_graph_element_ids.is_some()
780 {
781 let used_graph_element_ids_typed = match used_graph_element_ids {
782 Some(value) => Some(Some(serde_json::from_value(value).map_err(|e| {
783 ApiError::InvalidArgument(format!(
784 "used_graph_element_ids does not match {{node_ids:[], edge_ids:[]}} shape: {e}"
785 ))
786 })?)),
787 None => None,
788 };
789
790 let updates = SessionQAUpdate {
791 feedback_text: feedback_text.map(Some),
792 feedback_score: feedback_score.map(Some),
793 used_graph_element_ids: used_graph_element_ids_typed,
794 ..Default::default()
795 };
796
797 sm.update_qa(Some(session_id), Some(&user_id_str), &qa_id, updates)
798 .await?;
799 }
800
801 entry_id = Some(qa_id);
802 }
803
804 MemoryEntry::Trace(t) => {
805 let TraceEntry {
806 origin_function,
807 status: trace_status,
808 method_params,
809 method_return_value,
810 memory_query,
811 memory_context,
812 error_message,
813 generate_feedback_with_llm,
814 } = t;
815
816 let trace_id = sm
817 .add_agent_trace_step(
818 &user_id_str,
819 Some(session_id),
820 &origin_function,
821 &trace_status,
822 &memory_query,
823 &memory_context,
824 method_params.unwrap_or(serde_json::Value::Null),
825 method_return_value,
826 &error_message,
827 generate_feedback_with_llm,
828 )
829 .await?;
830
831 entry_id = Some(trace_id);
832 }
833
834 MemoryEntry::Feedback(f) => {
835 let FeedbackEntry {
836 qa_id,
837 feedback_text,
838 feedback_score,
839 } = f;
840
841 let ok = sm
842 .add_feedback(
843 Some(session_id),
844 Some(&user_id_str),
845 &qa_id,
846 feedback_text.as_deref(),
847 feedback_score,
848 )
849 .await?;
850
851 if !ok {
852 status = RememberStatus::Errored;
853 error = Some(format!(
854 "add_feedback: QA {qa_id} not found in session {session_id}"
855 ));
856 }
857 entry_id = Some(qa_id);
860 }
861 }
862
863 info!(
864 session_id = session_id,
865 entry_type = entry_type_str,
866 entry_id = entry_id.as_deref().unwrap_or(""),
867 status = ?status,
868 "remember_entry: dispatched typed memory entry"
869 );
870
871 Ok(RememberResult {
872 status,
873 dataset_name: dataset_name.to_string(),
874 dataset_id: None,
875 session_ids: Some(vec![session_id.to_string()]),
876 pipeline_run_id: None,
877 elapsed_seconds: Some(start.elapsed().as_secs_f64()),
878 content_hash: None,
879 items_processed: 0,
880 items: vec![],
881 error,
882 entry_type: Some(entry_type_str.to_string()),
883 entry_id,
884 cognify_result: None,
885 memify_result: None,
886 })
887}
888
889#[cfg(test)]
894#[allow(
895 clippy::unwrap_used,
896 clippy::expect_used,
897 reason = "test code — panics are acceptable failures"
898)]
899mod tests {
900 use super::*;
901
902 #[test]
903 fn remember_status_serde_roundtrip_errored() {
904 let s = RememberStatus::Errored;
905 let j = serde_json::to_string(&s).expect("serialize");
906 assert_eq!(j, "\"PipelineRunErrored\"");
907 let back: RememberStatus = serde_json::from_str(&j).expect("deserialize");
908 assert_eq!(back, RememberStatus::Errored);
909 }
910
911 #[test]
912 fn remember_status_serializes_to_pipeline_run_camelcase() {
913 assert_eq!(
918 serde_json::to_string(&RememberStatus::Started).expect("ser"),
919 "\"PipelineRunStarted\""
920 );
921 assert_eq!(
922 serde_json::to_string(&RememberStatus::Completed).expect("ser"),
923 "\"PipelineRunCompleted\""
924 );
925 assert_eq!(
926 serde_json::to_string(&RememberStatus::Errored).expect("ser"),
927 "\"PipelineRunErrored\""
928 );
929 assert_eq!(
930 serde_json::to_string(&RememberStatus::SessionStored).expect("ser"),
931 "\"SessionStored\""
932 );
933 }
934
935 #[test]
936 fn remember_status_deserializes_from_pipeline_run_camelcase() {
937 for (s, expected) in [
938 ("\"PipelineRunStarted\"", RememberStatus::Started),
939 ("\"PipelineRunCompleted\"", RememberStatus::Completed),
940 ("\"PipelineRunErrored\"", RememberStatus::Errored),
941 ("\"SessionStored\"", RememberStatus::SessionStored),
942 ] {
943 let got: RememberStatus = serde_json::from_str(s).expect("deserialize");
944 assert_eq!(got, expected, "for input {s}");
945 }
946 }
947
948 #[test]
949 fn remember_status_from_pipeline_run_status_translation_table() {
950 use cognee_core::pipeline::PipelineRunStatus;
951 assert_eq!(
954 RememberStatus::from(PipelineRunStatus::Initiated),
955 RememberStatus::Started
956 );
957 assert_eq!(
958 RememberStatus::from(PipelineRunStatus::Started),
959 RememberStatus::Started
960 );
961 assert_eq!(
962 RememberStatus::from(PipelineRunStatus::Completed),
963 RememberStatus::Completed
964 );
965 assert_eq!(
966 RememberStatus::from(PipelineRunStatus::Errored),
967 RememberStatus::Errored
968 );
969 }
970
971 #[test]
972 fn remember_result_elapsed_seconds_serializes_as_null_when_none() {
973 let mut r = sample_result(RememberStatus::Completed);
974 r.elapsed_seconds = None;
975 let v = r.to_dict();
976 let obj = v.as_object().expect("object");
977 assert!(
978 obj.contains_key("elapsed_seconds"),
979 "elapsed_seconds key should be present even when None (Python parity)"
980 );
981 assert!(
982 obj.get("elapsed_seconds").is_some_and(|v| v.is_null()),
983 "elapsed_seconds should serialize as null when None"
984 );
985 }
986
987 #[test]
988 fn is_success_completed_and_session_stored() {
989 let mut r = sample_result(RememberStatus::Completed);
990 assert!(r.is_success());
991 assert!(r.done());
992
993 r.status = RememberStatus::SessionStored;
994 assert!(r.is_success());
995 assert!(r.done());
996 }
997
998 #[test]
999 fn is_success_errored() {
1000 let r = sample_result(RememberStatus::Errored);
1001 assert!(!r.is_success());
1002 assert!(r.done());
1004 }
1005
1006 #[test]
1007 fn all_statuses_are_done() {
1008 for status in [
1009 RememberStatus::Completed,
1010 RememberStatus::Errored,
1011 RememberStatus::SessionStored,
1012 ] {
1013 let r = sample_result(status);
1014 assert!(r.done(), "expected done() == true for {status:?}");
1015 }
1016 }
1017
1018 #[test]
1019 fn display_format_has_status_and_dataset() {
1020 let r = sample_result(RememberStatus::Completed);
1021 let text = format!("{r}");
1022 assert!(text.contains("RememberResult("));
1023 assert!(text.contains("status=Completed"));
1024 assert!(text.contains("dataset="));
1025 assert!(text.ends_with(')'));
1026 }
1027
1028 #[test]
1029 fn to_dict_omits_skipped_fields() {
1030 let r = sample_result(RememberStatus::Completed);
1031 let v = r.to_dict();
1032 assert!(v.is_object());
1033 let obj = v.as_object().expect("object");
1034 assert!(obj.contains_key("status"));
1035 assert!(obj.contains_key("dataset_name"));
1036 assert!(!obj.contains_key("cognify_result"));
1038 assert!(!obj.contains_key("memify_result"));
1039 }
1040
1041 #[test]
1042 fn display_formats_single_session_id() {
1043 let mut r = sample_result(RememberStatus::SessionStored);
1044 r.session_ids = Some(vec!["sess-123".to_string()]);
1045 let text = format!("{r}");
1046 assert!(text.contains("session_id=\"sess-123\""));
1047 assert!(!text.contains("session_ids="));
1048 }
1049
1050 fn sample_result(status: RememberStatus) -> RememberResult {
1051 RememberResult {
1052 status,
1053 dataset_name: "main_dataset".to_string(),
1054 dataset_id: None,
1055 session_ids: None,
1056 pipeline_run_id: None,
1057 elapsed_seconds: Some(1.23),
1058 content_hash: None,
1059 items_processed: 0,
1060 items: Vec::new(),
1061 error: None,
1062 entry_type: None,
1063 entry_id: None,
1064 cognify_result: None,
1065 memify_result: None,
1066 }
1067 }
1068}