1use crate::channel::Channel;
5use zeph_agent_persistence::graph::{build_graph_extraction_config, collect_context_messages};
6use zeph_agent_persistence::{
7 MemoryPersistenceView, MetricsView, PersistMessageRequest, PersistenceService, SecurityView,
8};
9use zeph_llm::provider::{LlmProvider as _, MessagePart, Role};
10
11use super::Agent;
12
13impl<C: Channel> Agent<C> {
14 pub async fn load_history(&mut self) -> Result<(), super::error::AgentError> {
30 let (Some(memory), Some(cid)) = (
31 self.services.memory.persistence.memory.as_ref(),
32 self.services.memory.persistence.conversation_id,
33 ) else {
34 return Ok(());
35 };
36
37 let memory = memory.clone();
39
40 let mut unsummarized = self.services.memory.persistence.unsummarized_count;
41 let memory_view = MemoryPersistenceView {
44 memory: Some(&memory),
45 conversation_id: self.services.memory.persistence.conversation_id,
46 autosave_assistant: self.services.memory.persistence.autosave_assistant,
47 autosave_min_length: self.services.memory.persistence.autosave_min_length,
48 unsummarized_count: &mut unsummarized,
49 goal_text: self.services.memory.extraction.goal_text.clone(),
50 };
51 let mut sqlite_delta = 0u64;
52 let mut embed_delta = 0u64;
53 let mut guard_delta = 0u64;
54 let mut metrics_view = MetricsView {
55 sqlite_message_count: &mut sqlite_delta,
56 embeddings_generated: &mut embed_delta,
57 exfiltration_memory_guards: &mut guard_delta,
58 };
59
60 let svc = PersistenceService::new();
61 let outcome = svc
62 .load_history(
63 &mut self.msg.messages,
64 &mut self.msg.last_persisted_message_id,
65 &mut self.msg.deferred_db_hide_ids,
66 &mut self.msg.deferred_db_summaries,
67 &memory_view,
68 &zeph_config::Config::default(),
69 &mut metrics_view,
70 )
71 .await
72 .map_err(|e| {
73 super::error::AgentError::Memory(zeph_memory::MemoryError::Other(e.to_string()))
74 })?;
75
76 self.services.memory.persistence.unsummarized_count = unsummarized;
78
79 if outcome.messages_loaded > 0 {
80 let _ = memory
82 .sqlite()
83 .increment_session_counts_for_conversation(cid)
84 .await
85 .inspect_err(|e| {
86 tracing::warn!(error = %e, "failed to increment tier session counts");
87 });
88 }
89
90 self.update_metrics(|m| {
92 m.sqlite_message_count = outcome.sqlite_total_messages;
93 });
94 if let Ok(count) = memory.sqlite().count_semantic_facts().await {
95 let count_u64 = u64::try_from(count).unwrap_or(0);
96 self.update_metrics(|m| {
97 m.semantic_fact_count = count_u64;
98 });
99 }
100 if let Ok(count) = memory.unsummarized_message_count(cid).await {
101 self.services.memory.persistence.unsummarized_count =
102 usize::try_from(count).unwrap_or(0);
103 }
104
105 self.recompute_prompt_tokens();
106 Ok(())
107 }
108
109 #[cfg_attr(
115 feature = "profiling",
116 tracing::instrument(name = "agent.persist_message", skip_all)
117 )]
118 pub(crate) async fn persist_message(
119 &mut self,
120 role: Role,
121 content: &str,
122 parts: &[MessagePart],
123 has_injection_flags: bool,
124 ) {
125 let guard_event = self
129 .services
130 .security
131 .exfiltration_guard
132 .should_guard_memory_write(has_injection_flags);
133 if let Some(ref event) = guard_event {
134 tracing::warn!(
135 ?event,
136 "exfiltration guard: skipping Qdrant embedding for flagged content"
137 );
138 self.push_security_event(
139 zeph_common::SecurityEventCategory::ExfiltrationBlock,
140 "memory_write",
141 "Qdrant embedding skipped: flagged content",
142 );
143 }
144
145 let req = PersistMessageRequest::from_borrowed(role, content, parts, has_injection_flags);
146
147 let mut unsummarized = self.services.memory.persistence.unsummarized_count;
148 let memory_arc = self.services.memory.persistence.memory.clone();
149 let mut memory_view = MemoryPersistenceView {
150 memory: memory_arc.as_ref(),
151 conversation_id: self.services.memory.persistence.conversation_id,
152 autosave_assistant: self.services.memory.persistence.autosave_assistant,
153 autosave_min_length: self.services.memory.persistence.autosave_min_length,
154 unsummarized_count: &mut unsummarized,
155 goal_text: self.services.memory.extraction.goal_text.clone(),
156 };
157 let security = SecurityView {
158 guard_memory_writes: guard_event.is_some(),
159 _phantom: std::marker::PhantomData,
160 };
161 let mut sqlite_delta = 0u64;
162 let mut embed_delta = 0u64;
163 let mut guard_delta = 0u64;
164 let mut metrics_view = MetricsView {
165 sqlite_message_count: &mut sqlite_delta,
166 embeddings_generated: &mut embed_delta,
167 exfiltration_memory_guards: &mut guard_delta,
168 };
169
170 let svc = PersistenceService::new();
171 let outcome = svc
172 .persist_message(
173 req,
174 &mut self.msg.last_persisted_message_id,
175 &mut memory_view,
176 &security,
177 &zeph_config::Config::default(),
178 &mut metrics_view,
179 )
180 .await;
181
182 self.services.memory.persistence.unsummarized_count = unsummarized;
184
185 self.update_metrics(|m| {
187 m.sqlite_message_count += sqlite_delta;
188 m.embeddings_generated += embed_delta;
189 m.exfiltration_memory_guards += guard_delta;
191 });
192
193 if outcome.message_id.is_none() {
194 return;
195 }
196
197 self.enqueue_summarization_task();
201
202 let has_tool_result_parts = parts
205 .iter()
206 .any(|p| matches!(p, MessagePart::ToolResult { .. }));
207
208 self.enqueue_graph_extraction_task(content, has_injection_flags, has_tool_result_parts)
209 .await;
210
211 if role == Role::User && !has_tool_result_parts && !has_injection_flags {
213 self.enqueue_persona_extraction_task();
214 }
215
216 if has_tool_result_parts {
218 self.enqueue_trajectory_extraction_task();
219 }
220
221 let has_tool_use_parts = parts
226 .iter()
227 .any(|p| matches!(p, MessagePart::ToolUse { .. }));
228 if role == Role::Assistant && !has_tool_use_parts && !has_injection_flags {
229 self.enqueue_reasoning_extraction_task();
230 self.enqueue_memcot_distill_task(content);
232 }
233 }
234
235 fn enqueue_memcot_distill_task(&mut self, assistant_content: &str) {
240 let Some(accumulator) = &self.services.memory.extraction.memcot_accumulator else {
241 return;
242 };
243 let distill_provider_name = self
244 .services
245 .memory
246 .extraction
247 .memcot_config
248 .distill_provider
249 .as_str();
250 let provider = self.resolve_background_provider(distill_provider_name);
251
252 let content = assistant_content.to_owned();
253 let supervisor = &mut self.runtime.lifecycle.supervisor;
254
255 accumulator.maybe_enqueue_distill(&content, provider, |name, fut| {
256 supervisor.spawn(super::agent_supervisor::TaskClass::Enrichment, name, fut);
257 });
258 }
259
260 fn enqueue_summarization_task(&mut self) {
262 let (Some(memory), Some(cid)) = (
263 self.services.memory.persistence.memory.clone(),
264 self.services.memory.persistence.conversation_id,
265 ) else {
266 return;
267 };
268
269 if self.services.memory.persistence.unsummarized_count
270 <= self.services.memory.compaction.summarization_threshold
271 {
272 return;
273 }
274
275 let batch_size = self.services.memory.compaction.summarization_threshold / 2;
276
277 self.runtime.lifecycle.supervisor.spawn_summarization("summarization", async move {
278 match tokio::time::timeout(
279 std::time::Duration::from_secs(30),
280 memory.summarize(cid, batch_size),
281 )
282 .await
283 {
284 Ok(Ok(Some(summary_id))) => {
285 tracing::info!(
286 "background summarization: created summary {summary_id} for conversation {cid}"
287 );
288 true
289 }
290 Ok(Ok(None)) => {
291 tracing::debug!("background summarization: no summarization needed");
292 false
293 }
294 Ok(Err(e)) => {
295 tracing::error!("background summarization failed: {e:#}");
296 false
297 }
298 Err(_) => {
299 tracing::warn!("background summarization timed out after 30s");
300 false
301 }
302 }
303 });
304 }
305
306 async fn enqueue_graph_extraction_task(
311 &mut self,
312 content: &str,
313 has_injection_flags: bool,
314 has_tool_result_parts: bool,
315 ) {
316 if self.services.memory.persistence.memory.is_none()
317 || self.services.memory.persistence.conversation_id.is_none()
318 {
319 return;
320 }
321 if has_tool_result_parts {
322 tracing::debug!("graph extraction skipped: message contains ToolResult parts");
323 return;
324 }
325 if has_injection_flags {
326 tracing::warn!("graph extraction skipped: injection patterns detected in content");
327 return;
328 }
329
330 let cfg = &self.services.memory.extraction.graph_config;
331 if !cfg.enabled {
332 return;
333 }
334 let extraction_cfg = build_graph_extraction_config(
335 cfg,
336 self.services
337 .memory
338 .persistence
339 .conversation_id
340 .map(|c| c.0),
341 );
342 let extract_provider_name = cfg.extract_provider.as_str().to_owned();
345
346 if self.rpe_should_skip(content).await {
349 tracing::debug!("D-MEM RPE: low-surprise turn, skipping graph extraction");
350 return;
351 }
352
353 let context_messages = collect_context_messages(&self.msg.messages);
354
355 let Some(memory) = self.services.memory.persistence.memory.clone() else {
356 return;
357 };
358
359 let validator: zeph_memory::semantic::PostExtractValidator =
360 if self.services.security.memory_validator.is_enabled() {
361 let v = self.services.security.memory_validator.clone();
362 Some(Box::new(move |result| {
363 v.validate_graph_extraction(result)
364 .map_err(|e| e.to_string())
365 }))
366 } else {
367 None
368 };
369
370 let provider_override = if extract_provider_name.is_empty() {
371 None
372 } else {
373 Some(self.resolve_background_provider(&extract_provider_name))
374 };
375
376 self.spawn_graph_extraction_task(
377 memory,
378 content,
379 context_messages,
380 extraction_cfg,
381 validator,
382 provider_override,
383 );
384
385 self.sync_community_detection_failures();
387 self.sync_graph_extraction_metrics();
388 self.enqueue_graph_count_sync_task();
389 }
390
391 fn spawn_graph_extraction_task(
392 &mut self,
393 memory: std::sync::Arc<zeph_memory::semantic::SemanticMemory>,
394 content: &str,
395 context_messages: Vec<String>,
396 extraction_cfg: zeph_memory::semantic::GraphExtractionConfig,
397 validator: zeph_memory::semantic::PostExtractValidator,
398 provider_override: Option<zeph_llm::any::AnyProvider>,
399 ) {
400 let content_owned = content.to_owned();
401 let graph_store = memory.graph_store.clone();
402 let metrics_tx = self.runtime.metrics.metrics_tx.clone();
403 let start_time = self.runtime.lifecycle.start_time;
404
405 self.runtime.lifecycle.supervisor.spawn(
406 super::agent_supervisor::TaskClass::Enrichment,
407 "graph_extraction",
408 async move {
409 let extraction_handle = memory.spawn_graph_extraction(
410 content_owned,
411 context_messages,
412 extraction_cfg,
413 validator,
414 provider_override,
415 );
416
417 if let (Some(store), Some(tx)) = (graph_store, metrics_tx) {
419 let _ = extraction_handle.await;
420 let (entities, edges, communities) = tokio::join!(
421 store.entity_count(),
422 store.active_edge_count(),
423 store.community_count()
424 );
425 let elapsed = start_time.elapsed().as_secs();
426 tx.send_modify(|m| {
427 m.uptime_seconds = elapsed;
428 m.graph_entities_total = entities.unwrap_or(0).cast_unsigned();
429 m.graph_edges_total = edges.unwrap_or(0).cast_unsigned();
430 m.graph_communities_total = communities.unwrap_or(0).cast_unsigned();
431 });
432 } else {
433 let _ = extraction_handle.await;
434 }
435
436 tracing::debug!("background graph extraction complete");
437 },
438 );
439 }
440
441 fn enqueue_graph_count_sync_task(&mut self) {
443 let memory_for_sync = self.services.memory.persistence.memory.clone();
444 let metrics_tx_sync = self.runtime.metrics.metrics_tx.clone();
445 let start_time_sync = self.runtime.lifecycle.start_time;
446 let cid_sync = self.services.memory.persistence.conversation_id;
447 let graph_store_sync = memory_for_sync.as_ref().and_then(|m| m.graph_store.clone());
448 let sqlite_sync = memory_for_sync.as_ref().map(|m| m.sqlite().clone());
449 let guidelines_enabled = self.services.memory.extraction.graph_config.enabled;
450
451 self.runtime.lifecycle.supervisor.spawn(
452 super::agent_supervisor::TaskClass::Telemetry,
453 "graph_count_sync",
454 async move {
455 let Some(store) = graph_store_sync else {
456 return;
457 };
458 let Some(tx) = metrics_tx_sync else { return };
459
460 let (entities, edges, communities) = tokio::join!(
461 store.entity_count(),
462 store.active_edge_count(),
463 store.community_count()
464 );
465 let elapsed = start_time_sync.elapsed().as_secs();
466 tx.send_modify(|m| {
467 m.uptime_seconds = elapsed;
468 m.graph_entities_total = entities.unwrap_or(0).cast_unsigned();
469 m.graph_edges_total = edges.unwrap_or(0).cast_unsigned();
470 m.graph_communities_total = communities.unwrap_or(0).cast_unsigned();
471 });
472
473 if guidelines_enabled && let Some(sqlite) = sqlite_sync {
475 match tokio::time::timeout(
476 std::time::Duration::from_secs(10),
477 sqlite.load_compression_guidelines_meta(cid_sync),
478 )
479 .await
480 {
481 Ok(Ok((version, created_at))) => {
482 #[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
483 let version_u32 = u32::try_from(version).unwrap_or(0);
484 tx.send_modify(|m| {
485 m.guidelines_version = version_u32;
486 m.guidelines_updated_at = created_at;
487 });
488 }
489 Ok(Err(e)) => {
490 tracing::debug!("guidelines status sync failed: {e:#}");
491 }
492 Err(_) => {
493 tracing::debug!("guidelines status sync timed out");
494 }
495 }
496 }
497 },
498 );
499 }
500
501 fn enqueue_persona_extraction_task(&mut self) {
503 use zeph_memory::semantic::{PersonaExtractionConfig, extract_persona_facts};
504
505 let cfg = &self.services.memory.extraction.persona_config;
506 if !cfg.enabled {
507 return;
508 }
509
510 let Some(memory) = &self.services.memory.persistence.memory else {
511 return;
512 };
513
514 let user_messages: Vec<String> = self
515 .msg
516 .messages
517 .iter()
518 .filter(|m| {
519 m.role == Role::User
520 && !m
521 .parts
522 .iter()
523 .any(|p| matches!(p, MessagePart::ToolResult { .. }))
524 })
525 .take(8)
526 .map(|m| {
527 if m.content.len() > 2048 {
528 m.content[..m.content.floor_char_boundary(2048)].to_owned()
529 } else {
530 m.content.clone()
531 }
532 })
533 .collect();
534
535 if user_messages.len() < cfg.min_messages {
536 return;
537 }
538
539 let timeout_secs = cfg.extraction_timeout_secs;
540 let extraction_cfg = PersonaExtractionConfig {
541 enabled: cfg.enabled,
542 min_messages: cfg.min_messages,
543 max_messages: cfg.max_messages,
544 extraction_timeout_secs: timeout_secs,
545 };
546
547 let provider = self.resolve_background_provider(cfg.persona_provider.as_str());
548 let store = memory.sqlite().clone();
549 let conversation_id = self
550 .services
551 .memory
552 .persistence
553 .conversation_id
554 .map(|c| c.0);
555
556 self.runtime.lifecycle.supervisor.spawn(
557 super::agent_supervisor::TaskClass::Enrichment,
558 "persona_extraction",
559 async move {
560 let user_message_refs: Vec<&str> =
561 user_messages.iter().map(String::as_str).collect();
562 let fut = extract_persona_facts(
563 &store,
564 &provider,
565 &user_message_refs,
566 &extraction_cfg,
567 conversation_id,
568 );
569 match tokio::time::timeout(std::time::Duration::from_secs(timeout_secs), fut).await
570 {
571 Ok(Ok(n)) => tracing::debug!(upserted = n, "persona extraction complete"),
572 Ok(Err(e)) => tracing::warn!(error = %e, "persona extraction failed"),
573 Err(_) => tracing::warn!(
574 timeout_secs,
575 "persona extraction timed out — no facts written this turn"
576 ),
577 }
578 },
579 );
580 }
581
582 fn enqueue_trajectory_extraction_task(&mut self) {
584 use zeph_memory::semantic::{TrajectoryExtractionConfig, extract_trajectory_entries};
585
586 let cfg = self.services.memory.extraction.trajectory_config.clone();
587 if !cfg.enabled {
588 return;
589 }
590
591 let Some(memory) = &self.services.memory.persistence.memory else {
592 return;
593 };
594
595 let conversation_id = match self.services.memory.persistence.conversation_id {
596 Some(cid) => cid.0,
597 None => return,
598 };
599
600 let tail_start = self.msg.messages.len().saturating_sub(cfg.max_messages);
601 let turn_messages: Vec<zeph_llm::provider::Message> =
602 self.msg.messages[tail_start..].to_vec();
603
604 if turn_messages.is_empty() {
605 return;
606 }
607
608 let extraction_cfg = TrajectoryExtractionConfig {
609 enabled: cfg.enabled,
610 max_messages: cfg.max_messages,
611 extraction_timeout_secs: cfg.extraction_timeout_secs,
612 };
613
614 let provider = self.resolve_background_provider(cfg.trajectory_provider.as_str());
615 let store = memory.sqlite().clone();
616 let min_confidence = cfg.min_confidence;
617
618 self.runtime.lifecycle.supervisor.spawn(
619 super::agent_supervisor::TaskClass::Enrichment,
620 "trajectory_extraction",
621 async move {
622 let entries =
623 match extract_trajectory_entries(&provider, &turn_messages, &extraction_cfg)
624 .await
625 {
626 Ok(e) => e,
627 Err(e) => {
628 tracing::warn!(error = %e, "trajectory extraction failed");
629 return;
630 }
631 };
632
633 let last_id = store
634 .trajectory_last_extracted_message_id(conversation_id)
635 .await
636 .unwrap_or(0);
637
638 let mut max_id = last_id;
639 for entry in &entries {
640 if entry.confidence < min_confidence {
641 continue;
642 }
643 let tools_json = serde_json::to_string(&entry.tools_used)
644 .unwrap_or_else(|_| "[]".to_string());
645 match store
646 .insert_trajectory_entry(zeph_memory::NewTrajectoryEntry {
647 conversation_id: Some(conversation_id),
648 turn_index: 0,
649 kind: &entry.kind,
650 intent: &entry.intent,
651 outcome: &entry.outcome,
652 tools_used: &tools_json,
653 confidence: entry.confidence,
654 })
655 .await
656 {
657 Ok(id) => {
658 if id > max_id {
659 max_id = id;
660 }
661 }
662 Err(e) => tracing::warn!(error = %e, "failed to insert trajectory entry"),
663 }
664 }
665
666 if max_id > last_id {
667 let _ = store
668 .set_trajectory_last_extracted_message_id(conversation_id, max_id)
669 .await;
670 }
671
672 tracing::debug!(
673 count = entries.len(),
674 conversation_id,
675 "trajectory extraction complete"
676 );
677 },
678 );
679 }
680
681 fn enqueue_reasoning_extraction_task(&mut self) {
686 let cfg = self.services.memory.extraction.reasoning_config.clone();
687 if !cfg.enabled {
688 return;
689 }
690
691 let Some(memory) = &self.services.memory.persistence.memory else {
692 return;
693 };
694
695 let Some(reasoning) = memory.reasoning.clone() else {
696 return;
697 };
698
699 let tail_start = self.msg.messages.len().saturating_sub(cfg.max_messages);
700 let turn_messages: Vec<zeph_llm::provider::Message> =
701 self.msg.messages[tail_start..].to_vec();
702
703 if turn_messages.len() < cfg.min_messages {
704 return;
705 }
706
707 let extract_provider = self.resolve_background_provider(cfg.extract_provider.as_str());
708 let distill_provider = self.resolve_background_provider(cfg.distill_provider.as_str());
709 let embed_provider = memory.effective_embed_provider().clone();
710 let store_limit = cfg.store_limit;
711 let extraction_timeout = std::time::Duration::from_secs(cfg.extraction_timeout_secs);
712 let distill_timeout = std::time::Duration::from_secs(cfg.distill_timeout_secs);
713 let self_judge_window = cfg.self_judge_window;
714 let min_assistant_chars = cfg.min_assistant_chars;
715
716 self.runtime.lifecycle.supervisor.spawn(
717 super::agent_supervisor::TaskClass::Enrichment,
718 "reasoning_extraction",
719 async move {
720 if let Err(e) = zeph_memory::process_reasoning_turn(
721 &reasoning,
722 &extract_provider,
723 &distill_provider,
724 &embed_provider,
725 &turn_messages,
726 zeph_memory::ProcessTurnConfig {
727 store_limit,
728 extraction_timeout,
729 distill_timeout,
730 self_judge_window,
731 min_assistant_chars,
732 },
733 )
734 .await
735 {
736 tracing::warn!(error = %e, "reasoning: process_turn failed");
737 }
738
739 tracing::debug!("reasoning extraction complete");
740 },
741 );
742 }
743
744 async fn rpe_should_skip(&mut self, content: &str) -> bool {
749 let Some(ref rpe_mutex) = self.services.memory.extraction.rpe_router else {
750 return false;
751 };
752 let Some(memory) = &self.services.memory.persistence.memory else {
753 return false;
754 };
755 let candidates = zeph_memory::extract_candidate_entities(content);
756 let provider = memory.provider();
757 let Ok(Ok(emb_vec)) =
758 tokio::time::timeout(std::time::Duration::from_secs(5), provider.embed(content)).await
759 else {
760 return false; };
762 if let Ok(mut router) = rpe_mutex.lock() {
763 let signal = router.compute(&emb_vec, &candidates);
764 router.push_embedding(emb_vec);
765 router.push_entities(&candidates);
766 !signal.should_extract
767 } else {
768 tracing::warn!("rpe_router mutex poisoned; falling through to extract");
769 false
770 }
771 }
772}
773
774#[cfg(test)]
775mod tests {
776 use super::super::agent_tests::{
777 MetricsSnapshot, MockChannel, MockToolExecutor, create_test_registry, mock_provider,
778 };
779 use super::*;
780 use zeph_llm::any::AnyProvider;
781 use zeph_llm::provider::Message;
782 use zeph_memory::semantic::SemanticMemory;
783
784 async fn test_memory(provider: &AnyProvider) -> SemanticMemory {
785 SemanticMemory::new(
786 ":memory:",
787 "http://127.0.0.1:1",
788 None,
789 provider.clone(),
790 "test-model",
791 )
792 .await
793 .unwrap()
794 }
795
796 #[tokio::test]
797 async fn load_history_without_memory_returns_ok() {
798 let provider = mock_provider(vec![]);
799 let channel = MockChannel::new(vec![]);
800 let registry = create_test_registry();
801 let executor = MockToolExecutor::no_tools();
802 let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
803
804 let result = agent.load_history().await;
805 assert!(result.is_ok());
806 assert_eq!(agent.msg.messages.len(), 1); }
809
810 #[tokio::test]
811 async fn load_history_with_messages_injects_into_agent() {
812 let provider = mock_provider(vec![]);
813 let channel = MockChannel::new(vec![]);
814 let registry = create_test_registry();
815 let executor = MockToolExecutor::no_tools();
816
817 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
818 let cid = memory.sqlite().create_conversation().await.unwrap();
819
820 memory
821 .sqlite()
822 .save_message(cid, "user", "hello from history")
823 .await
824 .unwrap();
825 memory
826 .sqlite()
827 .save_message(cid, "assistant", "hi back")
828 .await
829 .unwrap();
830
831 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
832 std::sync::Arc::new(memory),
833 cid,
834 50,
835 5,
836 100,
837 );
838
839 let messages_before = agent.msg.messages.len();
840 agent.load_history().await.unwrap();
841 assert_eq!(agent.msg.messages.len(), messages_before + 2);
843 }
844
845 #[tokio::test]
846 async fn load_history_skips_empty_messages() {
847 let provider = mock_provider(vec![]);
848 let channel = MockChannel::new(vec![]);
849 let registry = create_test_registry();
850 let executor = MockToolExecutor::no_tools();
851
852 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
853 let cid = memory.sqlite().create_conversation().await.unwrap();
854
855 memory
857 .sqlite()
858 .save_message(cid, "user", " ")
859 .await
860 .unwrap();
861 memory
862 .sqlite()
863 .save_message(cid, "user", "real message")
864 .await
865 .unwrap();
866
867 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
868 std::sync::Arc::new(memory),
869 cid,
870 50,
871 5,
872 100,
873 );
874
875 let messages_before = agent.msg.messages.len();
876 agent.load_history().await.unwrap();
877 assert_eq!(agent.msg.messages.len(), messages_before + 1);
879 }
880
881 #[tokio::test]
882 async fn load_history_with_empty_store_returns_ok() {
883 let provider = mock_provider(vec![]);
884 let channel = MockChannel::new(vec![]);
885 let registry = create_test_registry();
886 let executor = MockToolExecutor::no_tools();
887
888 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
889 let cid = memory.sqlite().create_conversation().await.unwrap();
890
891 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
892 std::sync::Arc::new(memory),
893 cid,
894 50,
895 5,
896 100,
897 );
898
899 let messages_before = agent.msg.messages.len();
900 agent.load_history().await.unwrap();
901 assert_eq!(agent.msg.messages.len(), messages_before);
903 }
904
905 #[tokio::test]
906 async fn load_history_increments_session_count_for_existing_messages() {
907 let provider = mock_provider(vec![]);
908 let channel = MockChannel::new(vec![]);
909 let registry = create_test_registry();
910 let executor = MockToolExecutor::no_tools();
911
912 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
913 let cid = memory.sqlite().create_conversation().await.unwrap();
914
915 let id1 = memory
917 .sqlite()
918 .save_message(cid, "user", "hello")
919 .await
920 .unwrap();
921 let id2 = memory
922 .sqlite()
923 .save_message(cid, "assistant", "hi")
924 .await
925 .unwrap();
926
927 let memory_arc = std::sync::Arc::new(memory);
928 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
929 memory_arc.clone(),
930 cid,
931 50,
932 5,
933 100,
934 );
935
936 agent.load_history().await.unwrap();
937
938 let counts: Vec<i64> = zeph_db::query_scalar(
940 "SELECT session_count FROM messages WHERE id IN (?, ?) ORDER BY id",
941 )
942 .bind(id1)
943 .bind(id2)
944 .fetch_all(memory_arc.sqlite().pool())
945 .await
946 .unwrap();
947 assert_eq!(
948 counts,
949 vec![1, 1],
950 "session_count must be 1 after first restore"
951 );
952 }
953
954 #[tokio::test]
955 async fn load_history_does_not_increment_session_count_for_new_conversation() {
956 let provider = mock_provider(vec![]);
957 let channel = MockChannel::new(vec![]);
958 let registry = create_test_registry();
959 let executor = MockToolExecutor::no_tools();
960
961 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
962 let cid = memory.sqlite().create_conversation().await.unwrap();
963
964 let memory_arc = std::sync::Arc::new(memory);
966 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
967 memory_arc.clone(),
968 cid,
969 50,
970 5,
971 100,
972 );
973
974 agent.load_history().await.unwrap();
975
976 let counts: Vec<i64> =
978 zeph_db::query_scalar("SELECT session_count FROM messages WHERE conversation_id = ?")
979 .bind(cid)
980 .fetch_all(memory_arc.sqlite().pool())
981 .await
982 .unwrap();
983 assert!(counts.is_empty(), "new conversation must have no messages");
984 }
985
986 #[tokio::test]
987 async fn persist_message_without_memory_silently_returns() {
988 let provider = mock_provider(vec![]);
990 let channel = MockChannel::new(vec![]);
991 let registry = create_test_registry();
992 let executor = MockToolExecutor::no_tools();
993 let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
994
995 agent.persist_message(Role::User, "hello", &[], false).await;
997 }
998
999 #[tokio::test]
1000 async fn persist_message_assistant_autosave_false_uses_save_only() {
1001 let provider = mock_provider(vec![]);
1002 let channel = MockChannel::new(vec![]);
1003 let registry = create_test_registry();
1004 let executor = MockToolExecutor::no_tools();
1005
1006 let (tx, rx) = tokio::sync::watch::channel(MetricsSnapshot::default());
1007 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1008 let cid = memory.sqlite().create_conversation().await.unwrap();
1009
1010 let mut agent = Agent::new(provider, channel, registry, None, 5, executor)
1011 .with_metrics(tx)
1012 .with_memory(std::sync::Arc::new(memory), cid, 50, 5, 100);
1013 agent.services.memory.persistence.autosave_assistant = false;
1014 agent.services.memory.persistence.autosave_min_length = 20;
1015
1016 agent
1017 .persist_message(Role::Assistant, "short assistant reply", &[], false)
1018 .await;
1019
1020 let history = agent
1021 .services
1022 .memory
1023 .persistence
1024 .memory
1025 .as_ref()
1026 .unwrap()
1027 .sqlite()
1028 .load_history(cid, 50)
1029 .await
1030 .unwrap();
1031 assert_eq!(history.len(), 1, "message must be saved");
1032 assert_eq!(history[0].content, "short assistant reply");
1033 assert_eq!(rx.borrow().embeddings_generated, 0);
1035 }
1036
1037 #[tokio::test]
1038 async fn persist_message_assistant_below_min_length_uses_save_only() {
1039 let provider = mock_provider(vec![]);
1040 let channel = MockChannel::new(vec![]);
1041 let registry = create_test_registry();
1042 let executor = MockToolExecutor::no_tools();
1043
1044 let (tx, rx) = tokio::sync::watch::channel(MetricsSnapshot::default());
1045 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1046 let cid = memory.sqlite().create_conversation().await.unwrap();
1047
1048 let mut agent = Agent::new(provider, channel, registry, None, 5, executor)
1050 .with_metrics(tx)
1051 .with_memory(std::sync::Arc::new(memory), cid, 50, 5, 100);
1052 agent.services.memory.persistence.autosave_assistant = true;
1053 agent.services.memory.persistence.autosave_min_length = 1000;
1054
1055 agent
1056 .persist_message(Role::Assistant, "too short", &[], false)
1057 .await;
1058
1059 let history = agent
1060 .services
1061 .memory
1062 .persistence
1063 .memory
1064 .as_ref()
1065 .unwrap()
1066 .sqlite()
1067 .load_history(cid, 50)
1068 .await
1069 .unwrap();
1070 assert_eq!(history.len(), 1, "message must be saved");
1071 assert_eq!(history[0].content, "too short");
1072 assert_eq!(rx.borrow().embeddings_generated, 0);
1073 }
1074
1075 #[tokio::test]
1076 async fn persist_message_assistant_at_min_length_boundary_uses_embed() {
1077 let provider = mock_provider(vec![]);
1079 let channel = MockChannel::new(vec![]);
1080 let registry = create_test_registry();
1081 let executor = MockToolExecutor::no_tools();
1082
1083 let (tx, rx) = tokio::sync::watch::channel(MetricsSnapshot::default());
1084 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1085 let cid = memory.sqlite().create_conversation().await.unwrap();
1086
1087 let min_length = 10usize;
1088 let mut agent = Agent::new(provider, channel, registry, None, 5, executor)
1089 .with_metrics(tx)
1090 .with_memory(std::sync::Arc::new(memory), cid, 50, 5, 100);
1091 agent.services.memory.persistence.autosave_assistant = true;
1092 agent.services.memory.persistence.autosave_min_length = min_length;
1093
1094 let content_at_boundary = "A".repeat(min_length);
1096 assert_eq!(content_at_boundary.len(), min_length);
1097 agent
1098 .persist_message(Role::Assistant, &content_at_boundary, &[], false)
1099 .await;
1100
1101 assert_eq!(rx.borrow().sqlite_message_count, 1);
1103 }
1104
1105 #[tokio::test]
1106 async fn persist_message_assistant_one_below_min_length_uses_save_only() {
1107 let provider = mock_provider(vec![]);
1109 let channel = MockChannel::new(vec![]);
1110 let registry = create_test_registry();
1111 let executor = MockToolExecutor::no_tools();
1112
1113 let (tx, rx) = tokio::sync::watch::channel(MetricsSnapshot::default());
1114 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1115 let cid = memory.sqlite().create_conversation().await.unwrap();
1116
1117 let min_length = 10usize;
1118 let mut agent = Agent::new(provider, channel, registry, None, 5, executor)
1119 .with_metrics(tx)
1120 .with_memory(std::sync::Arc::new(memory), cid, 50, 5, 100);
1121 agent.services.memory.persistence.autosave_assistant = true;
1122 agent.services.memory.persistence.autosave_min_length = min_length;
1123
1124 let content_below_boundary = "A".repeat(min_length - 1);
1126 assert_eq!(content_below_boundary.len(), min_length - 1);
1127 agent
1128 .persist_message(Role::Assistant, &content_below_boundary, &[], false)
1129 .await;
1130
1131 let history = agent
1132 .services
1133 .memory
1134 .persistence
1135 .memory
1136 .as_ref()
1137 .unwrap()
1138 .sqlite()
1139 .load_history(cid, 50)
1140 .await
1141 .unwrap();
1142 assert_eq!(history.len(), 1, "message must still be saved");
1143 assert_eq!(rx.borrow().embeddings_generated, 0);
1145 }
1146
1147 #[tokio::test]
1148 async fn persist_message_increments_unsummarized_count() {
1149 let provider = mock_provider(vec![]);
1150 let channel = MockChannel::new(vec![]);
1151 let registry = create_test_registry();
1152 let executor = MockToolExecutor::no_tools();
1153
1154 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1155 let cid = memory.sqlite().create_conversation().await.unwrap();
1156
1157 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
1159 std::sync::Arc::new(memory),
1160 cid,
1161 50,
1162 5,
1163 100,
1164 );
1165
1166 assert_eq!(agent.services.memory.persistence.unsummarized_count, 0);
1167
1168 agent.persist_message(Role::User, "first", &[], false).await;
1169 assert_eq!(agent.services.memory.persistence.unsummarized_count, 1);
1170
1171 agent
1172 .persist_message(Role::User, "second", &[], false)
1173 .await;
1174 assert_eq!(agent.services.memory.persistence.unsummarized_count, 2);
1175 }
1176
1177 #[tokio::test]
1178 async fn check_summarization_resets_counter_on_success() {
1179 let provider = mock_provider(vec![]);
1180 let channel = MockChannel::new(vec![]);
1181 let registry = create_test_registry();
1182 let executor = MockToolExecutor::no_tools();
1183
1184 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1185 let cid = memory.sqlite().create_conversation().await.unwrap();
1186
1187 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
1189 std::sync::Arc::new(memory),
1190 cid,
1191 50,
1192 5,
1193 1,
1194 );
1195
1196 agent.persist_message(Role::User, "msg1", &[], false).await;
1197 agent.persist_message(Role::User, "msg2", &[], false).await;
1198
1199 assert!(agent.services.memory.persistence.unsummarized_count <= 2);
1204 }
1205
1206 #[tokio::test]
1207 async fn unsummarized_count_not_incremented_without_memory() {
1208 let provider = mock_provider(vec![]);
1209 let channel = MockChannel::new(vec![]);
1210 let registry = create_test_registry();
1211 let executor = MockToolExecutor::no_tools();
1212 let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
1213
1214 agent.persist_message(Role::User, "hello", &[], false).await;
1215 assert_eq!(agent.services.memory.persistence.unsummarized_count, 0);
1217 }
1218
1219 mod graph_extraction_guards {
1221 use super::*;
1222 use crate::config::GraphConfig;
1223 use zeph_llm::provider::MessageMetadata;
1224 use zeph_memory::graph::GraphStore;
1225
1226 fn enabled_graph_config() -> GraphConfig {
1227 GraphConfig {
1228 enabled: true,
1229 ..GraphConfig::default()
1230 }
1231 }
1232
1233 async fn agent_with_graph(
1234 provider: &AnyProvider,
1235 config: GraphConfig,
1236 ) -> Agent<MockChannel> {
1237 let memory =
1238 test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1239 let cid = memory.sqlite().create_conversation().await.unwrap();
1240 Agent::new(
1241 provider.clone(),
1242 MockChannel::new(vec![]),
1243 create_test_registry(),
1244 None,
1245 5,
1246 MockToolExecutor::no_tools(),
1247 )
1248 .with_memory(std::sync::Arc::new(memory), cid, 50, 5, 100)
1249 .with_graph_config(config)
1250 }
1251
1252 #[tokio::test]
1253 async fn injection_flag_guard_skips_extraction() {
1254 let provider = mock_provider(vec![]);
1256 let mut agent = agent_with_graph(&provider, enabled_graph_config()).await;
1257 let pool = agent
1258 .services
1259 .memory
1260 .persistence
1261 .memory
1262 .as_ref()
1263 .unwrap()
1264 .sqlite()
1265 .pool()
1266 .clone();
1267
1268 agent
1269 .enqueue_graph_extraction_task("I use Rust", true, false)
1270 .await;
1271
1272 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1274
1275 let store = GraphStore::new(pool);
1276 let count = store.get_metadata("extraction_count").await.unwrap();
1277 assert!(
1278 count.is_none(),
1279 "injection flag must prevent extraction_count from being written"
1280 );
1281 }
1282
1283 #[tokio::test]
1284 async fn disabled_config_guard_skips_extraction() {
1285 let provider = mock_provider(vec![]);
1287 let disabled_cfg = GraphConfig {
1288 enabled: false,
1289 ..GraphConfig::default()
1290 };
1291 let mut agent = agent_with_graph(&provider, disabled_cfg).await;
1292 let pool = agent
1293 .services
1294 .memory
1295 .persistence
1296 .memory
1297 .as_ref()
1298 .unwrap()
1299 .sqlite()
1300 .pool()
1301 .clone();
1302
1303 agent
1304 .enqueue_graph_extraction_task("I use Rust", false, false)
1305 .await;
1306
1307 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1308
1309 let store = GraphStore::new(pool);
1310 let count = store.get_metadata("extraction_count").await.unwrap();
1311 assert!(
1312 count.is_none(),
1313 "disabled graph config must prevent extraction"
1314 );
1315 }
1316
1317 #[tokio::test]
1318 async fn happy_path_fires_extraction() {
1319 let provider = mock_provider(vec![]);
1322 let mut agent = agent_with_graph(&provider, enabled_graph_config()).await;
1323 let pool = agent
1324 .services
1325 .memory
1326 .persistence
1327 .memory
1328 .as_ref()
1329 .unwrap()
1330 .sqlite()
1331 .pool()
1332 .clone();
1333
1334 agent
1335 .enqueue_graph_extraction_task("I use Rust for systems programming", false, false)
1336 .await;
1337
1338 tokio::time::sleep(std::time::Duration::from_millis(200)).await;
1340
1341 let store = GraphStore::new(pool);
1342 let count = store.get_metadata("extraction_count").await.unwrap();
1343 assert!(
1344 count.is_some(),
1345 "happy-path extraction must increment extraction_count"
1346 );
1347 }
1348
1349 #[tokio::test]
1350 async fn tool_result_parts_guard_skips_extraction() {
1351 let provider = mock_provider(vec![]);
1355 let mut agent = agent_with_graph(&provider, enabled_graph_config()).await;
1356 let pool = agent
1357 .services
1358 .memory
1359 .persistence
1360 .memory
1361 .as_ref()
1362 .unwrap()
1363 .sqlite()
1364 .pool()
1365 .clone();
1366
1367 agent
1368 .enqueue_graph_extraction_task(
1369 "[tool_result: abc123]\nprovider_type = \"claude\"\nallowed_commands = []",
1370 false,
1371 true, )
1373 .await;
1374
1375 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1376
1377 let store = GraphStore::new(pool);
1378 let count = store.get_metadata("extraction_count").await.unwrap();
1379 assert!(
1380 count.is_none(),
1381 "tool result message must not trigger graph extraction"
1382 );
1383 }
1384
1385 #[tokio::test]
1386 async fn context_filter_excludes_tool_result_messages() {
1387 let provider = mock_provider(vec![]);
1398 let mut agent = agent_with_graph(&provider, enabled_graph_config()).await;
1399
1400 agent.msg.messages.push(Message {
1403 role: Role::User,
1404 content: "[tool_result: abc]\nprovider_type = \"openai\"".to_owned(),
1405 parts: vec![MessagePart::ToolResult {
1406 tool_use_id: "abc".to_owned(),
1407 content: "provider_type = \"openai\"".to_owned(),
1408 is_error: false,
1409 }],
1410 metadata: MessageMetadata::default(),
1411 });
1412
1413 let pool = agent
1414 .services
1415 .memory
1416 .persistence
1417 .memory
1418 .as_ref()
1419 .unwrap()
1420 .sqlite()
1421 .pool()
1422 .clone();
1423
1424 agent
1426 .enqueue_graph_extraction_task(
1427 "I prefer Rust for systems programming",
1428 false,
1429 false,
1430 )
1431 .await;
1432
1433 tokio::time::sleep(std::time::Duration::from_millis(200)).await;
1434
1435 let store = GraphStore::new(pool);
1437 let count = store.get_metadata("extraction_count").await.unwrap();
1438 assert!(
1439 count.is_some(),
1440 "conversational message must trigger extraction even with prior tool result in history"
1441 );
1442 }
1443 }
1444
1445 mod persona_extraction_guards {
1447 use super::*;
1448 use zeph_config::PersonaConfig;
1449 use zeph_llm::provider::MessageMetadata;
1450
1451 fn enabled_persona_config() -> PersonaConfig {
1452 PersonaConfig {
1453 enabled: true,
1454 min_messages: 1,
1455 ..PersonaConfig::default()
1456 }
1457 }
1458
1459 async fn agent_with_persona(
1460 provider: &AnyProvider,
1461 config: PersonaConfig,
1462 ) -> Agent<MockChannel> {
1463 let memory =
1464 test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1465 let cid = memory.sqlite().create_conversation().await.unwrap();
1466 let mut agent = Agent::new(
1467 provider.clone(),
1468 MockChannel::new(vec![]),
1469 create_test_registry(),
1470 None,
1471 5,
1472 MockToolExecutor::no_tools(),
1473 )
1474 .with_memory(std::sync::Arc::new(memory), cid, 50, 5, 100);
1475 agent.services.memory.extraction.persona_config = config;
1476 agent
1477 }
1478
1479 #[tokio::test]
1480 async fn disabled_config_skips_spawn() {
1481 let provider = mock_provider(vec![]);
1483 let mut agent = agent_with_persona(
1484 &provider,
1485 PersonaConfig {
1486 enabled: false,
1487 ..PersonaConfig::default()
1488 },
1489 )
1490 .await;
1491
1492 agent.msg.messages.push(zeph_llm::provider::Message {
1494 role: Role::User,
1495 content: "I prefer Rust for systems programming".to_owned(),
1496 parts: vec![],
1497 metadata: MessageMetadata::default(),
1498 });
1499
1500 agent.enqueue_persona_extraction_task();
1501
1502 let store = agent
1503 .services
1504 .memory
1505 .persistence
1506 .memory
1507 .as_ref()
1508 .unwrap()
1509 .sqlite()
1510 .clone();
1511 let count = store.count_persona_facts().await.unwrap();
1512 assert_eq!(count, 0, "disabled persona config must not write any facts");
1513 }
1514
1515 #[tokio::test]
1516 async fn below_min_messages_skips_spawn() {
1517 let provider = mock_provider(vec![]);
1519 let mut agent = agent_with_persona(
1520 &provider,
1521 PersonaConfig {
1522 enabled: true,
1523 min_messages: 3,
1524 ..PersonaConfig::default()
1525 },
1526 )
1527 .await;
1528
1529 for text in ["I use Rust", "I prefer async code"] {
1530 agent.msg.messages.push(zeph_llm::provider::Message {
1531 role: Role::User,
1532 content: text.to_owned(),
1533 parts: vec![],
1534 metadata: MessageMetadata::default(),
1535 });
1536 }
1537
1538 agent.enqueue_persona_extraction_task();
1539
1540 let store = agent
1541 .services
1542 .memory
1543 .persistence
1544 .memory
1545 .as_ref()
1546 .unwrap()
1547 .sqlite()
1548 .clone();
1549 let count = store.count_persona_facts().await.unwrap();
1550 assert_eq!(
1551 count, 0,
1552 "below min_messages threshold must not trigger extraction"
1553 );
1554 }
1555
1556 #[tokio::test]
1557 async fn no_memory_skips_spawn() {
1558 let provider = mock_provider(vec![]);
1560 let channel = MockChannel::new(vec![]);
1561 let registry = create_test_registry();
1562 let executor = MockToolExecutor::no_tools();
1563 let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
1564 agent.services.memory.extraction.persona_config = enabled_persona_config();
1565 agent.msg.messages.push(zeph_llm::provider::Message {
1566 role: Role::User,
1567 content: "I like Rust".to_owned(),
1568 parts: vec![],
1569 metadata: MessageMetadata::default(),
1570 });
1571
1572 agent.enqueue_persona_extraction_task();
1574 }
1575
1576 #[tokio::test]
1577 async fn enabled_enough_messages_spawns_extraction() {
1578 use zeph_llm::mock::MockProvider;
1581 let (mock, recorded) = MockProvider::default().with_recording();
1582 let provider = AnyProvider::Mock(mock);
1583 let mut agent = agent_with_persona(&provider, enabled_persona_config()).await;
1584
1585 agent.msg.messages.push(zeph_llm::provider::Message {
1586 role: Role::User,
1587 content: "I prefer Rust for systems programming".to_owned(),
1588 parts: vec![],
1589 metadata: MessageMetadata::default(),
1590 });
1591
1592 agent.enqueue_persona_extraction_task();
1593
1594 agent.runtime.lifecycle.supervisor.join_all_for_test().await;
1596
1597 let calls = recorded.lock().unwrap();
1598 assert!(
1599 !calls.is_empty(),
1600 "happy-path: provider.chat() must be called when extraction completes"
1601 );
1602 }
1603
1604 #[tokio::test]
1605 async fn messages_capped_at_eight() {
1606 use zeph_llm::mock::MockProvider;
1609 let (mock, recorded) = MockProvider::default().with_recording();
1610 let provider = AnyProvider::Mock(mock);
1611 let mut agent = agent_with_persona(&provider, enabled_persona_config()).await;
1612
1613 for i in 0..12u32 {
1614 agent.msg.messages.push(zeph_llm::provider::Message {
1615 role: Role::User,
1616 content: format!("I like message {i}"),
1617 parts: vec![],
1618 metadata: MessageMetadata::default(),
1619 });
1620 }
1621
1622 agent.enqueue_persona_extraction_task();
1623
1624 agent.runtime.lifecycle.supervisor.join_all_for_test().await;
1626
1627 let calls = recorded.lock().unwrap();
1629 assert!(
1630 !calls.is_empty(),
1631 "extraction must run when enough messages present"
1632 );
1633 let prompt = &calls[0];
1635 let user_text = prompt
1636 .iter()
1637 .filter(|m| m.role == Role::User)
1638 .map(|m| m.content.as_str())
1639 .collect::<Vec<_>>()
1640 .join(" ");
1641 assert!(
1643 !user_text.contains("I like message 8"),
1644 "message index 8 must be excluded from extraction input"
1645 );
1646 }
1647
1648 #[test]
1649 fn long_message_truncated_at_char_boundary() {
1650 let long_content = "x".repeat(3000);
1654 let truncated = if long_content.len() > 2048 {
1655 long_content[..long_content.floor_char_boundary(2048)].to_owned()
1656 } else {
1657 long_content.clone()
1658 };
1659 assert_eq!(
1660 truncated.len(),
1661 2048,
1662 "ASCII content must be truncated to exactly 2048 bytes"
1663 );
1664
1665 let multi = "é".repeat(1500); let truncated_multi = if multi.len() > 2048 {
1668 multi[..multi.floor_char_boundary(2048)].to_owned()
1669 } else {
1670 multi.clone()
1671 };
1672 assert!(
1673 truncated_multi.len() <= 2048,
1674 "multi-byte content must not exceed 2048 bytes"
1675 );
1676 assert!(truncated_multi.is_char_boundary(truncated_multi.len()));
1677 }
1678 }
1679
1680 #[tokio::test]
1681 async fn persist_message_user_always_embeds_regardless_of_autosave_flag() {
1682 let provider = mock_provider(vec![]);
1683 let channel = MockChannel::new(vec![]);
1684 let registry = create_test_registry();
1685 let executor = MockToolExecutor::no_tools();
1686
1687 let (tx, rx) = tokio::sync::watch::channel(MetricsSnapshot::default());
1688 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1689 let cid = memory.sqlite().create_conversation().await.unwrap();
1690
1691 let mut agent = Agent::new(provider, channel, registry, None, 5, executor)
1693 .with_metrics(tx)
1694 .with_memory(std::sync::Arc::new(memory), cid, 50, 5, 100);
1695 agent.services.memory.persistence.autosave_assistant = false;
1696 agent.services.memory.persistence.autosave_min_length = 20;
1697
1698 let long_user_msg = "A".repeat(100);
1699 agent
1700 .persist_message(Role::User, &long_user_msg, &[], false)
1701 .await;
1702
1703 let history = agent
1704 .services
1705 .memory
1706 .persistence
1707 .memory
1708 .as_ref()
1709 .unwrap()
1710 .sqlite()
1711 .load_history(cid, 50)
1712 .await
1713 .unwrap();
1714 assert_eq!(history.len(), 1, "user message must be saved");
1715 assert_eq!(rx.borrow().sqlite_message_count, 1);
1718 }
1719
1720 #[tokio::test]
1724 async fn persist_message_saves_correct_tool_use_parts() {
1725 use zeph_llm::provider::MessagePart;
1726
1727 let provider = mock_provider(vec![]);
1728 let channel = MockChannel::new(vec![]);
1729 let registry = create_test_registry();
1730 let executor = MockToolExecutor::no_tools();
1731
1732 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1733 let cid = memory.sqlite().create_conversation().await.unwrap();
1734
1735 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
1736 std::sync::Arc::new(memory),
1737 cid,
1738 50,
1739 5,
1740 100,
1741 );
1742
1743 let parts = vec![MessagePart::ToolUse {
1744 id: "call_abc123".to_string(),
1745 name: "read_file".to_string(),
1746 input: serde_json::json!({"path": "/tmp/test.txt"}),
1747 }];
1748 let content = "[tool_use: read_file(call_abc123)]";
1749
1750 agent
1751 .persist_message(Role::Assistant, content, &parts, false)
1752 .await;
1753
1754 let history = agent
1755 .services
1756 .memory
1757 .persistence
1758 .memory
1759 .as_ref()
1760 .unwrap()
1761 .sqlite()
1762 .load_history(cid, 50)
1763 .await
1764 .unwrap();
1765
1766 assert_eq!(history.len(), 1);
1767 assert_eq!(history[0].role, Role::Assistant);
1768 assert_eq!(history[0].content, content);
1769 assert_eq!(history[0].parts.len(), 1);
1770 match &history[0].parts[0] {
1771 MessagePart::ToolUse { id, name, .. } => {
1772 assert_eq!(id, "call_abc123");
1773 assert_eq!(name, "read_file");
1774 }
1775 other => panic!("expected ToolUse part, got {other:?}"),
1776 }
1777 assert!(
1779 !history[0]
1780 .parts
1781 .iter()
1782 .any(|p| matches!(p, MessagePart::ToolResult { .. })),
1783 "assistant message must not contain ToolResult parts"
1784 );
1785 }
1786
1787 #[tokio::test]
1788 async fn persist_message_saves_correct_tool_result_parts() {
1789 use zeph_llm::provider::MessagePart;
1790
1791 let provider = mock_provider(vec![]);
1792 let channel = MockChannel::new(vec![]);
1793 let registry = create_test_registry();
1794 let executor = MockToolExecutor::no_tools();
1795
1796 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1797 let cid = memory.sqlite().create_conversation().await.unwrap();
1798
1799 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
1800 std::sync::Arc::new(memory),
1801 cid,
1802 50,
1803 5,
1804 100,
1805 );
1806
1807 let parts = vec![MessagePart::ToolResult {
1808 tool_use_id: "call_abc123".to_string(),
1809 content: "file contents here".to_string(),
1810 is_error: false,
1811 }];
1812 let content = "[tool_result: call_abc123]\nfile contents here";
1813
1814 agent
1815 .persist_message(Role::User, content, &parts, false)
1816 .await;
1817
1818 let history = agent
1819 .services
1820 .memory
1821 .persistence
1822 .memory
1823 .as_ref()
1824 .unwrap()
1825 .sqlite()
1826 .load_history(cid, 50)
1827 .await
1828 .unwrap();
1829
1830 assert_eq!(history.len(), 1);
1831 assert_eq!(history[0].role, Role::User);
1832 assert_eq!(history[0].content, content);
1833 assert_eq!(history[0].parts.len(), 1);
1834 match &history[0].parts[0] {
1835 MessagePart::ToolResult {
1836 tool_use_id,
1837 content: result_content,
1838 is_error,
1839 } => {
1840 assert_eq!(tool_use_id, "call_abc123");
1841 assert_eq!(result_content, "file contents here");
1842 assert!(!is_error);
1843 }
1844 other => panic!("expected ToolResult part, got {other:?}"),
1845 }
1846 assert!(
1848 !history[0]
1849 .parts
1850 .iter()
1851 .any(|p| matches!(p, MessagePart::ToolUse { .. })),
1852 "user ToolResult message must not contain ToolUse parts"
1853 );
1854 }
1855
1856 #[tokio::test]
1857 async fn persist_message_roundtrip_preserves_role_part_alignment() {
1858 use zeph_llm::provider::MessagePart;
1859
1860 let provider = mock_provider(vec![]);
1861 let channel = MockChannel::new(vec![]);
1862 let registry = create_test_registry();
1863 let executor = MockToolExecutor::no_tools();
1864
1865 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1866 let cid = memory.sqlite().create_conversation().await.unwrap();
1867
1868 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
1869 std::sync::Arc::new(memory),
1870 cid,
1871 50,
1872 5,
1873 100,
1874 );
1875
1876 let assistant_parts = vec![MessagePart::ToolUse {
1878 id: "id_1".to_string(),
1879 name: "list_dir".to_string(),
1880 input: serde_json::json!({"path": "/tmp"}),
1881 }];
1882 agent
1883 .persist_message(
1884 Role::Assistant,
1885 "[tool_use: list_dir(id_1)]",
1886 &assistant_parts,
1887 false,
1888 )
1889 .await;
1890
1891 let user_parts = vec![MessagePart::ToolResult {
1893 tool_use_id: "id_1".to_string(),
1894 content: "file1.txt\nfile2.txt".to_string(),
1895 is_error: false,
1896 }];
1897 agent
1898 .persist_message(
1899 Role::User,
1900 "[tool_result: id_1]\nfile1.txt\nfile2.txt",
1901 &user_parts,
1902 false,
1903 )
1904 .await;
1905
1906 let history = agent
1907 .services
1908 .memory
1909 .persistence
1910 .memory
1911 .as_ref()
1912 .unwrap()
1913 .sqlite()
1914 .load_history(cid, 50)
1915 .await
1916 .unwrap();
1917
1918 assert_eq!(history.len(), 2);
1919
1920 assert_eq!(history[0].role, Role::Assistant);
1922 assert_eq!(history[0].content, "[tool_use: list_dir(id_1)]");
1923 assert!(
1924 matches!(&history[0].parts[0], MessagePart::ToolUse { id, .. } if id == "id_1"),
1925 "first message must be assistant ToolUse"
1926 );
1927
1928 assert_eq!(history[1].role, Role::User);
1930 assert_eq!(
1931 history[1].content,
1932 "[tool_result: id_1]\nfile1.txt\nfile2.txt"
1933 );
1934 assert!(
1935 matches!(&history[1].parts[0], MessagePart::ToolResult { tool_use_id, .. } if tool_use_id == "id_1"),
1936 "second message must be user ToolResult"
1937 );
1938
1939 assert!(
1941 !history[0]
1942 .parts
1943 .iter()
1944 .any(|p| matches!(p, MessagePart::ToolResult { .. })),
1945 "assistant message must not have ToolResult parts"
1946 );
1947 assert!(
1948 !history[1]
1949 .parts
1950 .iter()
1951 .any(|p| matches!(p, MessagePart::ToolUse { .. })),
1952 "user message must not have ToolUse parts"
1953 );
1954 }
1955
1956 #[tokio::test]
1957 async fn persist_message_saves_correct_tool_output_parts() {
1958 use zeph_llm::provider::MessagePart;
1959
1960 let provider = mock_provider(vec![]);
1961 let channel = MockChannel::new(vec![]);
1962 let registry = create_test_registry();
1963 let executor = MockToolExecutor::no_tools();
1964
1965 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1966 let cid = memory.sqlite().create_conversation().await.unwrap();
1967
1968 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
1969 std::sync::Arc::new(memory),
1970 cid,
1971 50,
1972 5,
1973 100,
1974 );
1975
1976 let parts = vec![MessagePart::ToolOutput {
1977 tool_name: "shell".into(),
1978 body: "hello from shell".to_string(),
1979 compacted_at: None,
1980 }];
1981 let content = "[tool: shell]\nhello from shell";
1982
1983 agent
1984 .persist_message(Role::User, content, &parts, false)
1985 .await;
1986
1987 let history = agent
1988 .services
1989 .memory
1990 .persistence
1991 .memory
1992 .as_ref()
1993 .unwrap()
1994 .sqlite()
1995 .load_history(cid, 50)
1996 .await
1997 .unwrap();
1998
1999 assert_eq!(history.len(), 1);
2000 assert_eq!(history[0].role, Role::User);
2001 assert_eq!(history[0].content, content);
2002 assert_eq!(history[0].parts.len(), 1);
2003 match &history[0].parts[0] {
2004 MessagePart::ToolOutput {
2005 tool_name,
2006 body,
2007 compacted_at,
2008 } => {
2009 assert_eq!(tool_name, "shell");
2010 assert_eq!(body, "hello from shell");
2011 assert!(compacted_at.is_none());
2012 }
2013 other => panic!("expected ToolOutput part, got {other:?}"),
2014 }
2015 }
2016
2017 #[tokio::test]
2020 async fn load_history_removes_trailing_orphan_tool_use() {
2021 use zeph_llm::provider::MessagePart;
2022
2023 let provider = mock_provider(vec![]);
2024 let channel = MockChannel::new(vec![]);
2025 let registry = create_test_registry();
2026 let executor = MockToolExecutor::no_tools();
2027
2028 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2029 let cid = memory.sqlite().create_conversation().await.unwrap();
2030 let sqlite = memory.sqlite();
2031
2032 sqlite
2034 .save_message(cid, "user", "do something with a tool")
2035 .await
2036 .unwrap();
2037
2038 let parts = serde_json::to_string(&[MessagePart::ToolUse {
2040 id: "call_orphan".to_string(),
2041 name: "shell".to_string(),
2042 input: serde_json::json!({"command": "ls"}),
2043 }])
2044 .unwrap();
2045 sqlite
2046 .save_message_with_parts(cid, "assistant", "[tool_use: shell(call_orphan)]", &parts)
2047 .await
2048 .unwrap();
2049
2050 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2051 std::sync::Arc::new(memory),
2052 cid,
2053 50,
2054 5,
2055 100,
2056 );
2057
2058 let messages_before = agent.msg.messages.len();
2059 agent.load_history().await.unwrap();
2060
2061 assert_eq!(
2063 agent.msg.messages.len(),
2064 messages_before + 1,
2065 "orphaned trailing tool_use must be removed"
2066 );
2067 assert_eq!(agent.msg.messages.last().unwrap().role, Role::User);
2068 }
2069
2070 #[tokio::test]
2071 async fn load_history_removes_leading_orphan_tool_result() {
2072 use zeph_llm::provider::MessagePart;
2073
2074 let provider = mock_provider(vec![]);
2075 let channel = MockChannel::new(vec![]);
2076 let registry = create_test_registry();
2077 let executor = MockToolExecutor::no_tools();
2078
2079 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2080 let cid = memory.sqlite().create_conversation().await.unwrap();
2081 let sqlite = memory.sqlite();
2082
2083 let result_parts = serde_json::to_string(&[MessagePart::ToolResult {
2085 tool_use_id: "call_missing".to_string(),
2086 content: "result data".to_string(),
2087 is_error: false,
2088 }])
2089 .unwrap();
2090 sqlite
2091 .save_message_with_parts(
2092 cid,
2093 "user",
2094 "[tool_result: call_missing]\nresult data",
2095 &result_parts,
2096 )
2097 .await
2098 .unwrap();
2099
2100 sqlite
2102 .save_message(cid, "assistant", "here is my response")
2103 .await
2104 .unwrap();
2105
2106 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2107 std::sync::Arc::new(memory),
2108 cid,
2109 50,
2110 5,
2111 100,
2112 );
2113
2114 let messages_before = agent.msg.messages.len();
2115 agent.load_history().await.unwrap();
2116
2117 assert_eq!(
2119 agent.msg.messages.len(),
2120 messages_before + 1,
2121 "orphaned leading tool_result must be removed"
2122 );
2123 assert_eq!(agent.msg.messages.last().unwrap().role, Role::Assistant);
2124 }
2125
2126 #[tokio::test]
2127 async fn load_history_preserves_complete_tool_pairs() {
2128 use zeph_llm::provider::MessagePart;
2129
2130 let provider = mock_provider(vec![]);
2131 let channel = MockChannel::new(vec![]);
2132 let registry = create_test_registry();
2133 let executor = MockToolExecutor::no_tools();
2134
2135 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2136 let cid = memory.sqlite().create_conversation().await.unwrap();
2137 let sqlite = memory.sqlite();
2138
2139 let use_parts = serde_json::to_string(&[MessagePart::ToolUse {
2141 id: "call_ok".to_string(),
2142 name: "shell".to_string(),
2143 input: serde_json::json!({"command": "pwd"}),
2144 }])
2145 .unwrap();
2146 sqlite
2147 .save_message_with_parts(cid, "assistant", "[tool_use: shell(call_ok)]", &use_parts)
2148 .await
2149 .unwrap();
2150
2151 let result_parts = serde_json::to_string(&[MessagePart::ToolResult {
2152 tool_use_id: "call_ok".to_string(),
2153 content: "/home/user".to_string(),
2154 is_error: false,
2155 }])
2156 .unwrap();
2157 sqlite
2158 .save_message_with_parts(
2159 cid,
2160 "user",
2161 "[tool_result: call_ok]\n/home/user",
2162 &result_parts,
2163 )
2164 .await
2165 .unwrap();
2166
2167 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2168 std::sync::Arc::new(memory),
2169 cid,
2170 50,
2171 5,
2172 100,
2173 );
2174
2175 let messages_before = agent.msg.messages.len();
2176 agent.load_history().await.unwrap();
2177
2178 assert_eq!(
2180 agent.msg.messages.len(),
2181 messages_before + 2,
2182 "complete tool_use/tool_result pair must be preserved"
2183 );
2184 assert_eq!(agent.msg.messages[messages_before].role, Role::Assistant);
2185 assert_eq!(agent.msg.messages[messages_before + 1].role, Role::User);
2186 }
2187
2188 #[tokio::test]
2189 async fn load_history_handles_multiple_trailing_orphans() {
2190 use zeph_llm::provider::MessagePart;
2191
2192 let provider = mock_provider(vec![]);
2193 let channel = MockChannel::new(vec![]);
2194 let registry = create_test_registry();
2195 let executor = MockToolExecutor::no_tools();
2196
2197 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2198 let cid = memory.sqlite().create_conversation().await.unwrap();
2199 let sqlite = memory.sqlite();
2200
2201 sqlite.save_message(cid, "user", "start").await.unwrap();
2203
2204 let parts1 = serde_json::to_string(&[MessagePart::ToolUse {
2206 id: "call_1".to_string(),
2207 name: "shell".to_string(),
2208 input: serde_json::json!({}),
2209 }])
2210 .unwrap();
2211 sqlite
2212 .save_message_with_parts(cid, "assistant", "[tool_use: shell(call_1)]", &parts1)
2213 .await
2214 .unwrap();
2215
2216 let parts2 = serde_json::to_string(&[MessagePart::ToolUse {
2218 id: "call_2".to_string(),
2219 name: "read_file".to_string(),
2220 input: serde_json::json!({}),
2221 }])
2222 .unwrap();
2223 sqlite
2224 .save_message_with_parts(cid, "assistant", "[tool_use: read_file(call_2)]", &parts2)
2225 .await
2226 .unwrap();
2227
2228 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2229 std::sync::Arc::new(memory),
2230 cid,
2231 50,
2232 5,
2233 100,
2234 );
2235
2236 let messages_before = agent.msg.messages.len();
2237 agent.load_history().await.unwrap();
2238
2239 assert_eq!(
2241 agent.msg.messages.len(),
2242 messages_before + 1,
2243 "all trailing orphaned tool_use messages must be removed"
2244 );
2245 assert_eq!(agent.msg.messages.last().unwrap().role, Role::User);
2246 }
2247
2248 #[tokio::test]
2249 async fn load_history_no_tool_messages_unchanged() {
2250 let provider = mock_provider(vec![]);
2251 let channel = MockChannel::new(vec![]);
2252 let registry = create_test_registry();
2253 let executor = MockToolExecutor::no_tools();
2254
2255 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2256 let cid = memory.sqlite().create_conversation().await.unwrap();
2257 let sqlite = memory.sqlite();
2258
2259 sqlite.save_message(cid, "user", "hello").await.unwrap();
2260 sqlite
2261 .save_message(cid, "assistant", "hi there")
2262 .await
2263 .unwrap();
2264 sqlite
2265 .save_message(cid, "user", "how are you?")
2266 .await
2267 .unwrap();
2268
2269 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2270 std::sync::Arc::new(memory),
2271 cid,
2272 50,
2273 5,
2274 100,
2275 );
2276
2277 let messages_before = agent.msg.messages.len();
2278 agent.load_history().await.unwrap();
2279
2280 assert_eq!(
2282 agent.msg.messages.len(),
2283 messages_before + 3,
2284 "plain messages without tool parts must pass through unchanged"
2285 );
2286 }
2287
2288 #[tokio::test]
2289 async fn load_history_removes_both_leading_and_trailing_orphans() {
2290 use zeph_llm::provider::MessagePart;
2291
2292 let provider = mock_provider(vec![]);
2293 let channel = MockChannel::new(vec![]);
2294 let registry = create_test_registry();
2295 let executor = MockToolExecutor::no_tools();
2296
2297 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2298 let cid = memory.sqlite().create_conversation().await.unwrap();
2299 let sqlite = memory.sqlite();
2300
2301 let result_parts = serde_json::to_string(&[MessagePart::ToolResult {
2303 tool_use_id: "call_leading".to_string(),
2304 content: "orphaned result".to_string(),
2305 is_error: false,
2306 }])
2307 .unwrap();
2308 sqlite
2309 .save_message_with_parts(
2310 cid,
2311 "user",
2312 "[tool_result: call_leading]\norphaned result",
2313 &result_parts,
2314 )
2315 .await
2316 .unwrap();
2317
2318 sqlite
2320 .save_message(cid, "user", "what is 2+2?")
2321 .await
2322 .unwrap();
2323 sqlite.save_message(cid, "assistant", "4").await.unwrap();
2324
2325 let use_parts = serde_json::to_string(&[MessagePart::ToolUse {
2327 id: "call_trailing".to_string(),
2328 name: "shell".to_string(),
2329 input: serde_json::json!({"command": "date"}),
2330 }])
2331 .unwrap();
2332 sqlite
2333 .save_message_with_parts(
2334 cid,
2335 "assistant",
2336 "[tool_use: shell(call_trailing)]",
2337 &use_parts,
2338 )
2339 .await
2340 .unwrap();
2341
2342 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2343 std::sync::Arc::new(memory),
2344 cid,
2345 50,
2346 5,
2347 100,
2348 );
2349
2350 let messages_before = agent.msg.messages.len();
2351 agent.load_history().await.unwrap();
2352
2353 assert_eq!(
2355 agent.msg.messages.len(),
2356 messages_before + 2,
2357 "both leading and trailing orphans must be removed"
2358 );
2359 assert_eq!(agent.msg.messages[messages_before].role, Role::User);
2360 assert_eq!(agent.msg.messages[messages_before].content, "what is 2+2?");
2361 assert_eq!(
2362 agent.msg.messages[messages_before + 1].role,
2363 Role::Assistant
2364 );
2365 assert_eq!(agent.msg.messages[messages_before + 1].content, "4");
2366 }
2367
2368 #[tokio::test]
2373 async fn sanitize_tool_pairs_strips_mid_history_orphan_tool_use() {
2374 use zeph_llm::provider::MessagePart;
2375
2376 let provider = mock_provider(vec![]);
2377 let channel = MockChannel::new(vec![]);
2378 let registry = create_test_registry();
2379 let executor = MockToolExecutor::no_tools();
2380
2381 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2382 let cid = memory.sqlite().create_conversation().await.unwrap();
2383 let sqlite = memory.sqlite();
2384
2385 sqlite
2387 .save_message(cid, "user", "first question")
2388 .await
2389 .unwrap();
2390 sqlite
2391 .save_message(cid, "assistant", "first answer")
2392 .await
2393 .unwrap();
2394
2395 let use_parts = serde_json::to_string(&[
2399 MessagePart::ToolUse {
2400 id: "call_mid_1".to_string(),
2401 name: "shell".to_string(),
2402 input: serde_json::json!({"command": "ls"}),
2403 },
2404 MessagePart::Text {
2405 text: "Let me check the files.".to_string(),
2406 },
2407 ])
2408 .unwrap();
2409 sqlite
2410 .save_message_with_parts(cid, "assistant", "Let me check the files.", &use_parts)
2411 .await
2412 .unwrap();
2413
2414 sqlite
2416 .save_message(cid, "user", "second question")
2417 .await
2418 .unwrap();
2419 sqlite
2420 .save_message(cid, "assistant", "second answer")
2421 .await
2422 .unwrap();
2423
2424 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2425 std::sync::Arc::new(memory),
2426 cid,
2427 50,
2428 5,
2429 100,
2430 );
2431
2432 let messages_before = agent.msg.messages.len();
2433 agent.load_history().await.unwrap();
2434
2435 assert_eq!(
2438 agent.msg.messages.len(),
2439 messages_before + 5,
2440 "message count must be 5 (orphan message kept — has text content)"
2441 );
2442
2443 let orphan = &agent.msg.messages[messages_before + 2];
2445 assert_eq!(orphan.role, Role::Assistant);
2446 assert!(
2447 !orphan
2448 .parts
2449 .iter()
2450 .any(|p| matches!(p, MessagePart::ToolUse { .. })),
2451 "orphaned ToolUse parts must be stripped from mid-history message"
2452 );
2453 assert!(
2455 orphan.parts.iter().any(
2456 |p| matches!(p, MessagePart::Text { text } if text == "Let me check the files.")
2457 ),
2458 "text content of orphaned assistant message must be preserved"
2459 );
2460 }
2461
2462 #[tokio::test]
2467 async fn load_history_keeps_tool_only_user_message() {
2468 use zeph_llm::provider::MessagePart;
2469
2470 let provider = mock_provider(vec![]);
2471 let channel = MockChannel::new(vec![]);
2472 let registry = create_test_registry();
2473 let executor = MockToolExecutor::no_tools();
2474
2475 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2476 let cid = memory.sqlite().create_conversation().await.unwrap();
2477 let sqlite = memory.sqlite();
2478
2479 let use_parts = serde_json::to_string(&[MessagePart::ToolUse {
2481 id: "call_rc3".to_string(),
2482 name: "memory_save".to_string(),
2483 input: serde_json::json!({"content": "something"}),
2484 }])
2485 .unwrap();
2486 sqlite
2487 .save_message_with_parts(cid, "assistant", "[tool_use: memory_save]", &use_parts)
2488 .await
2489 .unwrap();
2490
2491 let result_parts = serde_json::to_string(&[MessagePart::ToolResult {
2493 tool_use_id: "call_rc3".to_string(),
2494 content: "saved".to_string(),
2495 is_error: false,
2496 }])
2497 .unwrap();
2498 sqlite
2499 .save_message_with_parts(cid, "user", "", &result_parts)
2500 .await
2501 .unwrap();
2502
2503 sqlite.save_message(cid, "assistant", "done").await.unwrap();
2504
2505 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2506 std::sync::Arc::new(memory),
2507 cid,
2508 50,
2509 5,
2510 100,
2511 );
2512
2513 let messages_before = agent.msg.messages.len();
2514 agent.load_history().await.unwrap();
2515
2516 assert_eq!(
2519 agent.msg.messages.len(),
2520 messages_before + 3,
2521 "user message with empty content but ToolResult parts must not be dropped"
2522 );
2523
2524 let user_msg = &agent.msg.messages[messages_before + 1];
2526 assert_eq!(user_msg.role, Role::User);
2527 assert!(
2528 user_msg.parts.iter().any(
2529 |p| matches!(p, MessagePart::ToolResult { tool_use_id, .. } if tool_use_id == "call_rc3")
2530 ),
2531 "ToolResult part must be preserved on user message with empty content"
2532 );
2533 }
2534
2535 #[tokio::test]
2539 async fn strip_orphans_removes_orphaned_tool_result() {
2540 use zeph_llm::provider::MessagePart;
2541
2542 let provider = mock_provider(vec![]);
2543 let channel = MockChannel::new(vec![]);
2544 let registry = create_test_registry();
2545 let executor = MockToolExecutor::no_tools();
2546
2547 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2548 let cid = memory.sqlite().create_conversation().await.unwrap();
2549 let sqlite = memory.sqlite();
2550
2551 sqlite.save_message(cid, "user", "hello").await.unwrap();
2553 sqlite.save_message(cid, "assistant", "hi").await.unwrap();
2554
2555 sqlite
2557 .save_message(cid, "assistant", "plain answer")
2558 .await
2559 .unwrap();
2560
2561 let orphan_result_parts = serde_json::to_string(&[MessagePart::ToolResult {
2563 tool_use_id: "call_nonexistent".to_string(),
2564 content: "stale result".to_string(),
2565 is_error: false,
2566 }])
2567 .unwrap();
2568 sqlite
2569 .save_message_with_parts(
2570 cid,
2571 "user",
2572 "[tool_result: call_nonexistent]\nstale result",
2573 &orphan_result_parts,
2574 )
2575 .await
2576 .unwrap();
2577
2578 sqlite
2579 .save_message(cid, "assistant", "final")
2580 .await
2581 .unwrap();
2582
2583 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2584 std::sync::Arc::new(memory),
2585 cid,
2586 50,
2587 5,
2588 100,
2589 );
2590
2591 let messages_before = agent.msg.messages.len();
2592 agent.load_history().await.unwrap();
2593
2594 let loaded = &agent.msg.messages[messages_before..];
2598 for msg in loaded {
2599 assert!(
2600 !msg.parts.iter().any(|p| matches!(
2601 p,
2602 MessagePart::ToolResult { tool_use_id, .. } if tool_use_id == "call_nonexistent"
2603 )),
2604 "orphaned ToolResult part must be stripped from history"
2605 );
2606 }
2607 }
2608
2609 #[tokio::test]
2612 async fn strip_orphans_keeps_complete_pair() {
2613 use zeph_llm::provider::MessagePart;
2614
2615 let provider = mock_provider(vec![]);
2616 let channel = MockChannel::new(vec![]);
2617 let registry = create_test_registry();
2618 let executor = MockToolExecutor::no_tools();
2619
2620 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2621 let cid = memory.sqlite().create_conversation().await.unwrap();
2622 let sqlite = memory.sqlite();
2623
2624 let use_parts = serde_json::to_string(&[MessagePart::ToolUse {
2625 id: "call_valid".to_string(),
2626 name: "shell".to_string(),
2627 input: serde_json::json!({"command": "ls"}),
2628 }])
2629 .unwrap();
2630 sqlite
2631 .save_message_with_parts(cid, "assistant", "[tool_use: shell]", &use_parts)
2632 .await
2633 .unwrap();
2634
2635 let result_parts = serde_json::to_string(&[MessagePart::ToolResult {
2636 tool_use_id: "call_valid".to_string(),
2637 content: "file.rs".to_string(),
2638 is_error: false,
2639 }])
2640 .unwrap();
2641 sqlite
2642 .save_message_with_parts(cid, "user", "", &result_parts)
2643 .await
2644 .unwrap();
2645
2646 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2647 std::sync::Arc::new(memory),
2648 cid,
2649 50,
2650 5,
2651 100,
2652 );
2653
2654 let messages_before = agent.msg.messages.len();
2655 agent.load_history().await.unwrap();
2656
2657 assert_eq!(
2658 agent.msg.messages.len(),
2659 messages_before + 2,
2660 "complete tool_use/tool_result pair must be preserved"
2661 );
2662
2663 let user_msg = &agent.msg.messages[messages_before + 1];
2664 assert!(
2665 user_msg.parts.iter().any(|p| matches!(
2666 p,
2667 MessagePart::ToolResult { tool_use_id, .. } if tool_use_id == "call_valid"
2668 )),
2669 "ToolResult part for a matched tool_use must not be stripped"
2670 );
2671 }
2672
2673 #[tokio::test]
2676 async fn strip_orphans_mixed_history() {
2677 use zeph_llm::provider::MessagePart;
2678
2679 let provider = mock_provider(vec![]);
2680 let channel = MockChannel::new(vec![]);
2681 let registry = create_test_registry();
2682 let executor = MockToolExecutor::no_tools();
2683
2684 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2685 let cid = memory.sqlite().create_conversation().await.unwrap();
2686 let sqlite = memory.sqlite();
2687
2688 let use_parts_ok = serde_json::to_string(&[MessagePart::ToolUse {
2690 id: "call_good".to_string(),
2691 name: "shell".to_string(),
2692 input: serde_json::json!({"command": "pwd"}),
2693 }])
2694 .unwrap();
2695 sqlite
2696 .save_message_with_parts(cid, "assistant", "[tool_use: shell]", &use_parts_ok)
2697 .await
2698 .unwrap();
2699
2700 let result_parts_ok = serde_json::to_string(&[MessagePart::ToolResult {
2701 tool_use_id: "call_good".to_string(),
2702 content: "/home".to_string(),
2703 is_error: false,
2704 }])
2705 .unwrap();
2706 sqlite
2707 .save_message_with_parts(cid, "user", "", &result_parts_ok)
2708 .await
2709 .unwrap();
2710
2711 sqlite
2713 .save_message(cid, "assistant", "text only")
2714 .await
2715 .unwrap();
2716
2717 let orphan_parts = serde_json::to_string(&[MessagePart::ToolResult {
2718 tool_use_id: "call_ghost".to_string(),
2719 content: "ghost result".to_string(),
2720 is_error: false,
2721 }])
2722 .unwrap();
2723 sqlite
2724 .save_message_with_parts(
2725 cid,
2726 "user",
2727 "[tool_result: call_ghost]\nghost result",
2728 &orphan_parts,
2729 )
2730 .await
2731 .unwrap();
2732
2733 sqlite
2734 .save_message(cid, "assistant", "final reply")
2735 .await
2736 .unwrap();
2737
2738 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2739 std::sync::Arc::new(memory),
2740 cid,
2741 50,
2742 5,
2743 100,
2744 );
2745
2746 let messages_before = agent.msg.messages.len();
2747 agent.load_history().await.unwrap();
2748
2749 let loaded = &agent.msg.messages[messages_before..];
2750
2751 for msg in loaded {
2753 assert!(
2754 !msg.parts.iter().any(|p| matches!(
2755 p,
2756 MessagePart::ToolResult { tool_use_id, .. } if tool_use_id == "call_ghost"
2757 )),
2758 "orphaned ToolResult (call_ghost) must be stripped from history"
2759 );
2760 }
2761
2762 let has_good_result = loaded.iter().any(|msg| {
2765 msg.role == Role::User
2766 && msg.parts.iter().any(|p| {
2767 matches!(
2768 p,
2769 MessagePart::ToolResult { tool_use_id, .. } if tool_use_id == "call_good"
2770 )
2771 })
2772 });
2773 assert!(
2774 has_good_result,
2775 "matched ToolResult (call_good) must be preserved in history"
2776 );
2777 }
2778
2779 #[tokio::test]
2782 async fn sanitize_tool_pairs_preserves_matched_tool_pair() {
2783 use zeph_llm::provider::MessagePart;
2784
2785 let provider = mock_provider(vec![]);
2786 let channel = MockChannel::new(vec![]);
2787 let registry = create_test_registry();
2788 let executor = MockToolExecutor::no_tools();
2789
2790 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2791 let cid = memory.sqlite().create_conversation().await.unwrap();
2792 let sqlite = memory.sqlite();
2793
2794 sqlite
2795 .save_message(cid, "user", "run a command")
2796 .await
2797 .unwrap();
2798
2799 let use_parts = serde_json::to_string(&[MessagePart::ToolUse {
2801 id: "call_ok".to_string(),
2802 name: "shell".to_string(),
2803 input: serde_json::json!({"command": "echo hi"}),
2804 }])
2805 .unwrap();
2806 sqlite
2807 .save_message_with_parts(cid, "assistant", "[tool_use: shell]", &use_parts)
2808 .await
2809 .unwrap();
2810
2811 let result_parts = serde_json::to_string(&[MessagePart::ToolResult {
2813 tool_use_id: "call_ok".to_string(),
2814 content: "hi".to_string(),
2815 is_error: false,
2816 }])
2817 .unwrap();
2818 sqlite
2819 .save_message_with_parts(cid, "user", "[tool_result: call_ok]\nhi", &result_parts)
2820 .await
2821 .unwrap();
2822
2823 sqlite.save_message(cid, "assistant", "done").await.unwrap();
2824
2825 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2826 std::sync::Arc::new(memory),
2827 cid,
2828 50,
2829 5,
2830 100,
2831 );
2832
2833 let messages_before = agent.msg.messages.len();
2834 agent.load_history().await.unwrap();
2835
2836 assert_eq!(
2838 agent.msg.messages.len(),
2839 messages_before + 4,
2840 "matched tool pair must not be removed"
2841 );
2842 let tool_msg = &agent.msg.messages[messages_before + 1];
2843 assert!(
2844 tool_msg
2845 .parts
2846 .iter()
2847 .any(|p| matches!(p, MessagePart::ToolUse { id, .. } if id == "call_ok")),
2848 "matched ToolUse parts must be preserved"
2849 );
2850 }
2851
2852 #[tokio::test]
2856 async fn persist_cancelled_tool_results_pairs_tool_use() {
2857 use zeph_llm::provider::MessagePart;
2858
2859 let provider = mock_provider(vec![]);
2860 let channel = MockChannel::new(vec![]);
2861 let registry = create_test_registry();
2862 let executor = MockToolExecutor::no_tools();
2863
2864 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2865 let cid = memory.sqlite().create_conversation().await.unwrap();
2866
2867 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2868 std::sync::Arc::new(memory),
2869 cid,
2870 50,
2871 5,
2872 100,
2873 );
2874
2875 let tool_calls = vec![
2877 zeph_llm::provider::ToolUseRequest {
2878 id: "cancel_id_1".to_string(),
2879 name: "shell".to_string().into(),
2880 input: serde_json::json!({}),
2881 },
2882 zeph_llm::provider::ToolUseRequest {
2883 id: "cancel_id_2".to_string(),
2884 name: "read_file".to_string().into(),
2885 input: serde_json::json!({}),
2886 },
2887 ];
2888
2889 agent.persist_cancelled_tool_results(&tool_calls).await;
2890
2891 let history = agent
2892 .services
2893 .memory
2894 .persistence
2895 .memory
2896 .as_ref()
2897 .unwrap()
2898 .sqlite()
2899 .load_history(cid, 50)
2900 .await
2901 .unwrap();
2902
2903 assert_eq!(history.len(), 1);
2905 assert_eq!(history[0].role, Role::User);
2906
2907 for tc in &tool_calls {
2909 assert!(
2910 history[0].parts.iter().any(|p| matches!(
2911 p,
2912 MessagePart::ToolResult { tool_use_id, is_error, .. }
2913 if tool_use_id == &tc.id && *is_error
2914 )),
2915 "tombstone ToolResult for {} must be present and is_error=true",
2916 tc.id
2917 );
2918 }
2919 }
2920
2921 #[tokio::test]
2932 async fn issue_2529_orphaned_legacy_content_pair_is_soft_deleted() {
2933 use zeph_llm::provider::MessagePart;
2934
2935 let provider = mock_provider(vec![]);
2936 let channel = MockChannel::new(vec![]);
2937 let registry = create_test_registry();
2938 let executor = MockToolExecutor::no_tools();
2939
2940 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2941 let cid = memory.sqlite().create_conversation().await.unwrap();
2942 let sqlite = memory.sqlite();
2943
2944 sqlite
2946 .save_message(cid, "user", "save this for me")
2947 .await
2948 .unwrap();
2949
2950 let use_parts = serde_json::to_string(&[MessagePart::ToolUse {
2953 id: "call_2529".to_string(),
2954 name: "memory_save".to_string(),
2955 input: serde_json::json!({"content": "save this"}),
2956 }])
2957 .unwrap();
2958 let orphan_assistant_id = sqlite
2959 .save_message_with_parts(
2960 cid,
2961 "assistant",
2962 "[tool_use: memory_save(call_2529)]",
2963 &use_parts,
2964 )
2965 .await
2966 .unwrap();
2967
2968 sqlite
2973 .save_message(cid, "assistant", "here is a plain reply")
2974 .await
2975 .unwrap();
2976
2977 let result_parts = serde_json::to_string(&[MessagePart::ToolResult {
2978 tool_use_id: "call_2529".to_string(),
2979 content: "saved".to_string(),
2980 is_error: false,
2981 }])
2982 .unwrap();
2983 let orphan_user_id = sqlite
2984 .save_message_with_parts(
2985 cid,
2986 "user",
2987 "[tool_result: call_2529]\nsaved",
2988 &result_parts,
2989 )
2990 .await
2991 .unwrap();
2992
2993 sqlite.save_message(cid, "assistant", "done").await.unwrap();
2995
2996 let memory_arc = std::sync::Arc::new(memory);
2997 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2998 memory_arc.clone(),
2999 cid,
3000 50,
3001 5,
3002 100,
3003 );
3004
3005 agent.load_history().await.unwrap();
3006
3007 let assistant_deleted_count: Vec<i64> = zeph_db::query_scalar(
3010 "SELECT COUNT(*) FROM messages WHERE id = ? AND deleted_at IS NOT NULL",
3011 )
3012 .bind(orphan_assistant_id)
3013 .fetch_all(memory_arc.sqlite().pool())
3014 .await
3015 .unwrap();
3016
3017 let user_deleted_count: Vec<i64> = zeph_db::query_scalar(
3018 "SELECT COUNT(*) FROM messages WHERE id = ? AND deleted_at IS NOT NULL",
3019 )
3020 .bind(orphan_user_id)
3021 .fetch_all(memory_arc.sqlite().pool())
3022 .await
3023 .unwrap();
3024
3025 assert_eq!(
3026 assistant_deleted_count.first().copied().unwrap_or(0),
3027 1,
3028 "orphaned assistant[ToolUse] with legacy-only content must be soft-deleted (deleted_at IS NOT NULL)"
3029 );
3030 assert_eq!(
3031 user_deleted_count.first().copied().unwrap_or(0),
3032 1,
3033 "orphaned user[ToolResult] with legacy-only content must be soft-deleted (deleted_at IS NOT NULL)"
3034 );
3035 }
3036
3037 #[tokio::test]
3041 async fn issue_2529_soft_delete_is_idempotent_across_sessions() {
3042 use zeph_llm::provider::MessagePart;
3043
3044 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
3045 let cid = memory.sqlite().create_conversation().await.unwrap();
3046 let sqlite = memory.sqlite();
3047
3048 sqlite
3050 .save_message(cid, "user", "do something")
3051 .await
3052 .unwrap();
3053
3054 let use_parts = serde_json::to_string(&[MessagePart::ToolUse {
3056 id: "call_idem".to_string(),
3057 name: "shell".to_string(),
3058 input: serde_json::json!({"command": "ls"}),
3059 }])
3060 .unwrap();
3061 sqlite
3062 .save_message_with_parts(cid, "assistant", "[tool_use: shell(call_idem)]", &use_parts)
3063 .await
3064 .unwrap();
3065
3066 sqlite
3068 .save_message(cid, "assistant", "continuing")
3069 .await
3070 .unwrap();
3071
3072 let result_parts = serde_json::to_string(&[MessagePart::ToolResult {
3074 tool_use_id: "call_idem".to_string(),
3075 content: "output".to_string(),
3076 is_error: false,
3077 }])
3078 .unwrap();
3079 sqlite
3080 .save_message_with_parts(
3081 cid,
3082 "user",
3083 "[tool_result: call_idem]\noutput",
3084 &result_parts,
3085 )
3086 .await
3087 .unwrap();
3088
3089 sqlite
3090 .save_message(cid, "assistant", "final")
3091 .await
3092 .unwrap();
3093
3094 let memory_arc = std::sync::Arc::new(memory);
3095
3096 let mut agent1 = Agent::new(
3098 mock_provider(vec![]),
3099 MockChannel::new(vec![]),
3100 create_test_registry(),
3101 None,
3102 5,
3103 MockToolExecutor::no_tools(),
3104 )
3105 .with_memory(memory_arc.clone(), cid, 50, 5, 100);
3106 agent1.load_history().await.unwrap();
3107 let count_after_first = agent1.msg.messages.len();
3108
3109 let mut agent2 = Agent::new(
3112 mock_provider(vec![]),
3113 MockChannel::new(vec![]),
3114 create_test_registry(),
3115 None,
3116 5,
3117 MockToolExecutor::no_tools(),
3118 )
3119 .with_memory(memory_arc.clone(), cid, 50, 5, 100);
3120 agent2.load_history().await.unwrap();
3121 let count_after_second = agent2.msg.messages.len();
3122
3123 assert_eq!(
3125 count_after_first, count_after_second,
3126 "second load_history must load the same message count as the first (soft-deleted orphans excluded)"
3127 );
3128 }
3129
3130 #[tokio::test]
3134 async fn issue_2529_message_with_text_and_tool_tag_is_kept_after_part_strip() {
3135 use zeph_llm::provider::MessagePart;
3136
3137 let provider = mock_provider(vec![]);
3138 let channel = MockChannel::new(vec![]);
3139 let registry = create_test_registry();
3140 let executor = MockToolExecutor::no_tools();
3141
3142 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
3143 let cid = memory.sqlite().create_conversation().await.unwrap();
3144 let sqlite = memory.sqlite();
3145
3146 sqlite
3148 .save_message(cid, "user", "check the files")
3149 .await
3150 .unwrap();
3151
3152 let use_parts = serde_json::to_string(&[MessagePart::ToolUse {
3155 id: "call_mixed".to_string(),
3156 name: "shell".to_string(),
3157 input: serde_json::json!({"command": "ls"}),
3158 }])
3159 .unwrap();
3160 sqlite
3161 .save_message_with_parts(
3162 cid,
3163 "assistant",
3164 "Let me list the directory. [tool_use: shell(call_mixed)]",
3165 &use_parts,
3166 )
3167 .await
3168 .unwrap();
3169
3170 sqlite.save_message(cid, "user", "thanks").await.unwrap();
3172 sqlite
3173 .save_message(cid, "assistant", "you are welcome")
3174 .await
3175 .unwrap();
3176
3177 let memory_arc = std::sync::Arc::new(memory);
3178 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
3179 memory_arc.clone(),
3180 cid,
3181 50,
3182 5,
3183 100,
3184 );
3185
3186 let messages_before = agent.msg.messages.len();
3187 agent.load_history().await.unwrap();
3188
3189 assert_eq!(
3191 agent.msg.messages.len(),
3192 messages_before + 4,
3193 "assistant message with text + tool tag must not be removed after ToolUse strip"
3194 );
3195
3196 let mixed_msg = agent
3198 .msg
3199 .messages
3200 .iter()
3201 .find(|m| m.content.contains("Let me list the directory"))
3202 .expect("mixed-content assistant message must still be in history");
3203 assert!(
3204 !mixed_msg
3205 .parts
3206 .iter()
3207 .any(|p| matches!(p, MessagePart::ToolUse { .. })),
3208 "orphaned ToolUse parts must be stripped even when message has meaningful text"
3209 );
3210 assert_eq!(
3211 mixed_msg.content, "Let me list the directory. [tool_use: shell(call_mixed)]",
3212 "content field must be unchanged — only parts are stripped"
3213 );
3214 }
3215
3216 #[tokio::test]
3219 async fn persist_message_skipped_tool_result_does_not_embed() {
3220 use zeph_llm::provider::MessagePart;
3221
3222 let provider = mock_provider(vec![]);
3223 let channel = MockChannel::new(vec![]);
3224 let registry = create_test_registry();
3225 let executor = MockToolExecutor::no_tools();
3226
3227 let (tx, rx) = tokio::sync::watch::channel(MetricsSnapshot::default());
3228 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
3229 let cid = memory.sqlite().create_conversation().await.unwrap();
3230
3231 let mut agent = Agent::new(provider, channel, registry, None, 5, executor)
3232 .with_metrics(tx)
3233 .with_memory(std::sync::Arc::new(memory), cid, 50, 5, 100);
3234 agent.services.memory.persistence.autosave_assistant = true;
3235 agent.services.memory.persistence.autosave_min_length = 0;
3236
3237 let parts = vec![MessagePart::ToolResult {
3238 tool_use_id: "tu1".into(),
3239 content: "[skipped] bash tool was blocked by utility gate".into(),
3240 is_error: false,
3241 }];
3242
3243 agent
3244 .persist_message(
3245 Role::User,
3246 "[skipped] bash tool was blocked by utility gate",
3247 &parts,
3248 false,
3249 )
3250 .await;
3251
3252 assert_eq!(
3253 rx.borrow().embeddings_generated,
3254 0,
3255 "[skipped] ToolResult must not be embedded into Qdrant"
3256 );
3257 }
3258
3259 #[tokio::test]
3260 async fn persist_message_stopped_tool_result_does_not_embed() {
3261 use zeph_llm::provider::MessagePart;
3262
3263 let provider = mock_provider(vec![]);
3264 let channel = MockChannel::new(vec![]);
3265 let registry = create_test_registry();
3266 let executor = MockToolExecutor::no_tools();
3267
3268 let (tx, rx) = tokio::sync::watch::channel(MetricsSnapshot::default());
3269 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
3270 let cid = memory.sqlite().create_conversation().await.unwrap();
3271
3272 let mut agent = Agent::new(provider, channel, registry, None, 5, executor)
3273 .with_metrics(tx)
3274 .with_memory(std::sync::Arc::new(memory), cid, 50, 5, 100);
3275 agent.services.memory.persistence.autosave_assistant = true;
3276 agent.services.memory.persistence.autosave_min_length = 0;
3277
3278 let parts = vec![MessagePart::ToolResult {
3279 tool_use_id: "tu2".into(),
3280 content: "[stopped] execution limit reached".into(),
3281 is_error: false,
3282 }];
3283
3284 agent
3285 .persist_message(
3286 Role::User,
3287 "[stopped] execution limit reached",
3288 &parts,
3289 false,
3290 )
3291 .await;
3292
3293 assert_eq!(
3294 rx.borrow().embeddings_generated,
3295 0,
3296 "[stopped] ToolResult must not be embedded into Qdrant"
3297 );
3298 }
3299
3300 #[tokio::test]
3301 async fn persist_message_normal_tool_result_is_saved_not_blocked_by_guard() {
3302 use zeph_llm::provider::MessagePart;
3305
3306 let provider = mock_provider(vec![]);
3307 let channel = MockChannel::new(vec![]);
3308 let registry = create_test_registry();
3309 let executor = MockToolExecutor::no_tools();
3310
3311 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
3312 let cid = memory.sqlite().create_conversation().await.unwrap();
3313 let memory_arc = std::sync::Arc::new(memory);
3314
3315 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
3316 memory_arc.clone(),
3317 cid,
3318 50,
3319 5,
3320 100,
3321 );
3322 agent.services.memory.persistence.autosave_assistant = true;
3323 agent.services.memory.persistence.autosave_min_length = 0;
3324
3325 let content = "total 42\ndrwxr-xr-x 5 user group";
3326 let parts = vec![MessagePart::ToolResult {
3327 tool_use_id: "tu3".into(),
3328 content: content.into(),
3329 is_error: false,
3330 }];
3331
3332 agent
3333 .persist_message(Role::User, content, &parts, false)
3334 .await;
3335
3336 let history = memory_arc.sqlite().load_history(cid, 50).await.unwrap();
3338 assert_eq!(
3339 history.len(),
3340 1,
3341 "normal ToolResult must be saved to SQLite"
3342 );
3343 assert_eq!(history[0].content, content);
3344 }
3345
3346 #[test]
3351 fn trajectory_extraction_slice_bounds_messages() {
3352 let max_messages: usize = 20;
3354 let total_messages = 100usize;
3355
3356 let tail_start = total_messages.saturating_sub(max_messages);
3357 let window = total_messages - tail_start;
3358
3359 assert_eq!(
3360 window, 20,
3361 "slice should contain exactly max_messages items"
3362 );
3363 assert_eq!(tail_start, 80, "slice should start at len - max_messages");
3364 }
3365
3366 #[test]
3367 fn trajectory_extraction_slice_handles_few_messages() {
3368 let max_messages: usize = 20;
3369 let total_messages = 5usize;
3370
3371 let tail_start = total_messages.saturating_sub(max_messages);
3372 let window = total_messages - tail_start;
3373
3374 assert_eq!(window, 5, "should return all messages when fewer than max");
3375 assert_eq!(tail_start, 0, "slice should start from the beginning");
3376 }
3377
3378 #[tokio::test]
3384 async fn regression_3168_complete_tool_pair_survives_round_trip() {
3385 use zeph_llm::provider::MessagePart;
3386
3387 let provider = mock_provider(vec![]);
3388 let channel = MockChannel::new(vec![]);
3389 let registry = create_test_registry();
3390 let executor = MockToolExecutor::no_tools();
3391
3392 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
3393 let cid = memory.sqlite().create_conversation().await.unwrap();
3394 let sqlite = memory.sqlite();
3395
3396 let use_parts = serde_json::to_string(&[MessagePart::ToolUse {
3397 id: "r3168_call".to_string(),
3398 name: "shell".to_string(),
3399 input: serde_json::json!({"command": "echo hi"}),
3400 }])
3401 .unwrap();
3402 sqlite
3403 .save_message_with_parts(
3404 cid,
3405 "assistant",
3406 "[tool_use: shell(r3168_call)]",
3407 &use_parts,
3408 )
3409 .await
3410 .unwrap();
3411
3412 let result_parts = serde_json::to_string(&[MessagePart::ToolResult {
3413 tool_use_id: "r3168_call".to_string(),
3414 content: "[skipped]".to_string(),
3415 is_error: false,
3416 }])
3417 .unwrap();
3418 sqlite
3419 .save_message_with_parts(cid, "user", "[tool_result: r3168_call]", &result_parts)
3420 .await
3421 .unwrap();
3422
3423 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
3424 std::sync::Arc::new(memory),
3425 cid,
3426 50,
3427 5,
3428 100,
3429 );
3430
3431 let base = agent.msg.messages.len();
3432 agent.load_history().await.unwrap();
3433
3434 assert_eq!(
3435 agent.msg.messages.len(),
3436 base + 2,
3437 "both messages of the complete pair must survive load_history"
3438 );
3439
3440 let assistant_msg = agent
3441 .msg
3442 .messages
3443 .iter()
3444 .find(|m| m.role == Role::Assistant)
3445 .expect("assistant message missing after load_history");
3446 assert!(
3447 assistant_msg
3448 .parts
3449 .iter()
3450 .any(|p| matches!(p, MessagePart::ToolUse { id, .. } if id == "r3168_call")),
3451 "ToolUse part must be preserved in assistant message"
3452 );
3453
3454 let user_msg = agent
3455 .msg
3456 .messages
3457 .iter()
3458 .rev()
3459 .find(|m| m.role == Role::User)
3460 .expect("user message missing after load_history");
3461 assert!(
3462 user_msg.parts.iter().any(|p| matches!(p, MessagePart::ToolResult { tool_use_id, .. } if tool_use_id == "r3168_call")),
3463 "ToolResult part must be preserved in user message"
3464 );
3465 }
3466
3467 #[tokio::test]
3472 async fn regression_3168_corrupt_parts_row_skipped_on_load() {
3473 use zeph_llm::provider::MessagePart;
3474
3475 let provider = mock_provider(vec![]);
3476 let channel = MockChannel::new(vec![]);
3477 let registry = create_test_registry();
3478 let executor = MockToolExecutor::no_tools();
3479
3480 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
3481 let cid = memory.sqlite().create_conversation().await.unwrap();
3482 let sqlite = memory.sqlite();
3483
3484 sqlite
3488 .save_message_with_parts(cid, "assistant", "[tool_use: shell(corrupt)]", "[]")
3489 .await
3490 .unwrap();
3491
3492 let result_parts = serde_json::to_string(&[MessagePart::ToolResult {
3494 tool_use_id: "corrupt".to_string(),
3495 content: "result".to_string(),
3496 is_error: false,
3497 }])
3498 .unwrap();
3499 sqlite
3500 .save_message_with_parts(cid, "user", "[tool_result: corrupt]", &result_parts)
3501 .await
3502 .unwrap();
3503
3504 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
3505 std::sync::Arc::new(memory),
3506 cid,
3507 50,
3508 5,
3509 100,
3510 );
3511
3512 let base = agent.msg.messages.len();
3513 agent.load_history().await.unwrap();
3514
3515 let loaded = agent.msg.messages.len() - base;
3520 let orphan_present = agent.msg.messages.iter().any(|m| {
3523 m.role == Role::User
3524 && m.parts.iter().any(|p| {
3525 matches!(p, MessagePart::ToolResult { tool_use_id, .. } if tool_use_id == "corrupt")
3526 })
3527 });
3528 assert!(
3529 !orphan_present,
3530 "orphaned ToolResult must not survive load_history; loaded={loaded}"
3531 );
3532 }
3533}