1use crate::channel::Channel;
5use zeph_agent_persistence::graph::{build_graph_extraction_config, collect_context_messages};
6use zeph_agent_persistence::{
7 MemoryPersistenceView, MetricsView, PersistMessageRequest, PersistenceService, SecurityView,
8};
9use zeph_llm::provider::{LlmProvider as _, MessagePart, Role};
10
11use super::Agent;
12
13impl<C: Channel> Agent<C> {
14 pub async fn load_history(&mut self) -> Result<(), super::error::AgentError> {
30 let (Some(memory), Some(cid)) = (
31 self.services.memory.persistence.memory.as_ref(),
32 self.services.memory.persistence.conversation_id,
33 ) else {
34 return Ok(());
35 };
36
37 let memory = memory.clone();
39
40 let mut unsummarized = self.services.memory.persistence.unsummarized_count;
41 let memory_view = MemoryPersistenceView {
44 memory: Some(&memory),
45 conversation_id: self.services.memory.persistence.conversation_id,
46 autosave_assistant: self.services.memory.persistence.autosave_assistant,
47 autosave_min_length: self.services.memory.persistence.autosave_min_length,
48 unsummarized_count: &mut unsummarized,
49 goal_text: self.services.memory.extraction.goal_text.clone(),
50 };
51 let mut sqlite_delta = 0u64;
52 let mut embed_delta = 0u64;
53 let mut guard_delta = 0u64;
54 let mut metrics_view = MetricsView {
55 sqlite_message_count: &mut sqlite_delta,
56 embeddings_generated: &mut embed_delta,
57 exfiltration_memory_guards: &mut guard_delta,
58 };
59
60 let svc = PersistenceService::new();
61 let outcome = svc
62 .load_history(
63 &mut self.msg.messages,
64 &mut self.msg.last_persisted_message_id,
65 &mut self.msg.deferred_db_hide_ids,
66 &mut self.msg.deferred_db_summaries,
67 &memory_view,
68 &zeph_config::Config::default(),
69 &mut metrics_view,
70 )
71 .await
72 .map_err(|e| {
73 super::error::AgentError::Memory(zeph_memory::MemoryError::Other(e.to_string()))
74 })?;
75
76 self.services.memory.persistence.unsummarized_count = unsummarized;
78
79 if outcome.messages_loaded > 0 {
80 let _ = memory
82 .sqlite()
83 .increment_session_counts_for_conversation(cid)
84 .await
85 .inspect_err(|e| {
86 tracing::warn!(error = %e, "failed to increment tier session counts");
87 });
88 }
89
90 self.update_metrics(|m| {
92 m.sqlite_message_count = outcome.sqlite_total_messages;
93 });
94 if let Ok(count) = memory.sqlite().count_semantic_facts().await {
95 let count_u64 = u64::try_from(count).unwrap_or(0);
96 self.update_metrics(|m| {
97 m.semantic_fact_count = count_u64;
98 });
99 }
100 if let Ok(count) = memory.unsummarized_message_count(cid).await {
101 self.services.memory.persistence.unsummarized_count =
102 usize::try_from(count).unwrap_or(0);
103 }
104
105 self.recompute_prompt_tokens();
106 Ok(())
107 }
108
109 #[cfg_attr(
115 feature = "profiling",
116 tracing::instrument(name = "agent.persist_message", skip_all)
117 )]
118 pub(crate) async fn persist_message(
119 &mut self,
120 role: Role,
121 content: &str,
122 parts: &[MessagePart],
123 has_injection_flags: bool,
124 ) {
125 let guard_event = self
129 .services
130 .security
131 .exfiltration_guard
132 .should_guard_memory_write(has_injection_flags);
133 if let Some(ref event) = guard_event {
134 tracing::warn!(
135 ?event,
136 "exfiltration guard: skipping Qdrant embedding for flagged content"
137 );
138 self.push_security_event(
139 zeph_common::SecurityEventCategory::ExfiltrationBlock,
140 "memory_write",
141 "Qdrant embedding skipped: flagged content",
142 );
143 }
144
145 let req = PersistMessageRequest::from_borrowed(role, content, parts, has_injection_flags);
146
147 let mut unsummarized = self.services.memory.persistence.unsummarized_count;
148 let memory_arc = self.services.memory.persistence.memory.clone();
149 let mut memory_view = MemoryPersistenceView {
150 memory: memory_arc.as_ref(),
151 conversation_id: self.services.memory.persistence.conversation_id,
152 autosave_assistant: self.services.memory.persistence.autosave_assistant,
153 autosave_min_length: self.services.memory.persistence.autosave_min_length,
154 unsummarized_count: &mut unsummarized,
155 goal_text: self.services.memory.extraction.goal_text.clone(),
156 };
157 let security = SecurityView {
158 guard_memory_writes: guard_event.is_some(),
159 _phantom: std::marker::PhantomData,
160 };
161 let mut sqlite_delta = 0u64;
162 let mut embed_delta = 0u64;
163 let mut guard_delta = 0u64;
164 let mut metrics_view = MetricsView {
165 sqlite_message_count: &mut sqlite_delta,
166 embeddings_generated: &mut embed_delta,
167 exfiltration_memory_guards: &mut guard_delta,
168 };
169
170 let svc = PersistenceService::new();
171 let outcome = svc
172 .persist_message(
173 req,
174 &mut self.msg.last_persisted_message_id,
175 &mut memory_view,
176 &security,
177 &zeph_config::Config::default(),
178 &mut metrics_view,
179 )
180 .await;
181
182 self.services.memory.persistence.unsummarized_count = unsummarized;
184
185 self.update_metrics(|m| {
187 m.sqlite_message_count += sqlite_delta;
188 m.embeddings_generated += embed_delta;
189 m.exfiltration_memory_guards += guard_delta;
191 });
192
193 if outcome.message_id.is_none() {
194 return;
195 }
196
197 self.enqueue_summarization_task();
201
202 let has_tool_result_parts = parts
205 .iter()
206 .any(|p| matches!(p, MessagePart::ToolResult { .. }));
207
208 self.enqueue_graph_extraction_task(content, has_injection_flags, has_tool_result_parts)
209 .await;
210
211 if role == Role::User && !has_tool_result_parts && !has_injection_flags {
213 self.enqueue_persona_extraction_task();
214 }
215
216 if has_tool_result_parts {
218 self.enqueue_trajectory_extraction_task();
219 }
220
221 let has_tool_use_parts = parts
226 .iter()
227 .any(|p| matches!(p, MessagePart::ToolUse { .. }));
228 if role == Role::Assistant && !has_tool_use_parts && !has_injection_flags {
229 self.enqueue_reasoning_extraction_task();
230 }
231 }
232
233 fn enqueue_summarization_task(&mut self) {
235 let (Some(memory), Some(cid)) = (
236 self.services.memory.persistence.memory.clone(),
237 self.services.memory.persistence.conversation_id,
238 ) else {
239 return;
240 };
241
242 if self.services.memory.persistence.unsummarized_count
243 <= self.services.memory.compaction.summarization_threshold
244 {
245 return;
246 }
247
248 let batch_size = self.services.memory.compaction.summarization_threshold / 2;
249
250 self.runtime.lifecycle.supervisor.spawn_summarization("summarization", async move {
251 match tokio::time::timeout(
252 std::time::Duration::from_secs(30),
253 memory.summarize(cid, batch_size),
254 )
255 .await
256 {
257 Ok(Ok(Some(summary_id))) => {
258 tracing::info!(
259 "background summarization: created summary {summary_id} for conversation {cid}"
260 );
261 true
262 }
263 Ok(Ok(None)) => {
264 tracing::debug!("background summarization: no summarization needed");
265 false
266 }
267 Ok(Err(e)) => {
268 tracing::error!("background summarization failed: {e:#}");
269 false
270 }
271 Err(_) => {
272 tracing::warn!("background summarization timed out after 30s");
273 false
274 }
275 }
276 });
277 }
278
279 async fn enqueue_graph_extraction_task(
284 &mut self,
285 content: &str,
286 has_injection_flags: bool,
287 has_tool_result_parts: bool,
288 ) {
289 if self.services.memory.persistence.memory.is_none()
290 || self.services.memory.persistence.conversation_id.is_none()
291 {
292 return;
293 }
294 if has_tool_result_parts {
295 tracing::debug!("graph extraction skipped: message contains ToolResult parts");
296 return;
297 }
298 if has_injection_flags {
299 tracing::warn!("graph extraction skipped: injection patterns detected in content");
300 return;
301 }
302
303 let cfg = &self.services.memory.extraction.graph_config;
304 if !cfg.enabled {
305 return;
306 }
307 let extraction_cfg = build_graph_extraction_config(
308 cfg,
309 self.services
310 .memory
311 .persistence
312 .conversation_id
313 .map(|c| c.0),
314 );
315
316 if self.rpe_should_skip(content).await {
319 tracing::debug!("D-MEM RPE: low-surprise turn, skipping graph extraction");
320 return;
321 }
322
323 let context_messages = collect_context_messages(&self.msg.messages);
324
325 let Some(memory) = self.services.memory.persistence.memory.clone() else {
326 return;
327 };
328
329 let validator: zeph_memory::semantic::PostExtractValidator =
330 if self.services.security.memory_validator.is_enabled() {
331 let v = self.services.security.memory_validator.clone();
332 Some(Box::new(move |result| {
333 v.validate_graph_extraction(result)
334 .map_err(|e| e.to_string())
335 }))
336 } else {
337 None
338 };
339
340 self.spawn_graph_extraction_task(
341 memory,
342 content,
343 context_messages,
344 extraction_cfg,
345 validator,
346 );
347
348 self.sync_community_detection_failures();
350 self.sync_graph_extraction_metrics();
351 self.enqueue_graph_count_sync_task();
352 }
353
354 fn spawn_graph_extraction_task(
355 &mut self,
356 memory: std::sync::Arc<zeph_memory::semantic::SemanticMemory>,
357 content: &str,
358 context_messages: Vec<String>,
359 extraction_cfg: zeph_memory::semantic::GraphExtractionConfig,
360 validator: zeph_memory::semantic::PostExtractValidator,
361 ) {
362 let content_owned = content.to_owned();
363 let graph_store = memory.graph_store.clone();
364 let metrics_tx = self.runtime.metrics.metrics_tx.clone();
365 let start_time = self.runtime.lifecycle.start_time;
366
367 self.runtime.lifecycle.supervisor.spawn(
368 super::agent_supervisor::TaskClass::Enrichment,
369 "graph_extraction",
370 async move {
371 let extraction_handle = memory.spawn_graph_extraction(
372 content_owned,
373 context_messages,
374 extraction_cfg,
375 validator,
376 );
377
378 if let (Some(store), Some(tx)) = (graph_store, metrics_tx) {
380 let _ = extraction_handle.await;
381 let (entities, edges, communities) = tokio::join!(
382 store.entity_count(),
383 store.active_edge_count(),
384 store.community_count()
385 );
386 let elapsed = start_time.elapsed().as_secs();
387 tx.send_modify(|m| {
388 m.uptime_seconds = elapsed;
389 m.graph_entities_total = entities.unwrap_or(0).cast_unsigned();
390 m.graph_edges_total = edges.unwrap_or(0).cast_unsigned();
391 m.graph_communities_total = communities.unwrap_or(0).cast_unsigned();
392 });
393 } else {
394 let _ = extraction_handle.await;
395 }
396
397 tracing::debug!("background graph extraction complete");
398 },
399 );
400 }
401
402 fn enqueue_graph_count_sync_task(&mut self) {
404 let memory_for_sync = self.services.memory.persistence.memory.clone();
405 let metrics_tx_sync = self.runtime.metrics.metrics_tx.clone();
406 let start_time_sync = self.runtime.lifecycle.start_time;
407 let cid_sync = self.services.memory.persistence.conversation_id;
408 let graph_store_sync = memory_for_sync.as_ref().and_then(|m| m.graph_store.clone());
409 let sqlite_sync = memory_for_sync.as_ref().map(|m| m.sqlite().clone());
410 let guidelines_enabled = self.services.memory.extraction.graph_config.enabled;
411
412 self.runtime.lifecycle.supervisor.spawn(
413 super::agent_supervisor::TaskClass::Telemetry,
414 "graph_count_sync",
415 async move {
416 let Some(store) = graph_store_sync else {
417 return;
418 };
419 let Some(tx) = metrics_tx_sync else { return };
420
421 let (entities, edges, communities) = tokio::join!(
422 store.entity_count(),
423 store.active_edge_count(),
424 store.community_count()
425 );
426 let elapsed = start_time_sync.elapsed().as_secs();
427 tx.send_modify(|m| {
428 m.uptime_seconds = elapsed;
429 m.graph_entities_total = entities.unwrap_or(0).cast_unsigned();
430 m.graph_edges_total = edges.unwrap_or(0).cast_unsigned();
431 m.graph_communities_total = communities.unwrap_or(0).cast_unsigned();
432 });
433
434 if guidelines_enabled && let Some(sqlite) = sqlite_sync {
436 match tokio::time::timeout(
437 std::time::Duration::from_secs(10),
438 sqlite.load_compression_guidelines_meta(cid_sync),
439 )
440 .await
441 {
442 Ok(Ok((version, created_at))) => {
443 #[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
444 let version_u32 = u32::try_from(version).unwrap_or(0);
445 tx.send_modify(|m| {
446 m.guidelines_version = version_u32;
447 m.guidelines_updated_at = created_at;
448 });
449 }
450 Ok(Err(e)) => {
451 tracing::debug!("guidelines status sync failed: {e:#}");
452 }
453 Err(_) => {
454 tracing::debug!("guidelines status sync timed out");
455 }
456 }
457 }
458 },
459 );
460 }
461
462 fn enqueue_persona_extraction_task(&mut self) {
464 use zeph_memory::semantic::{PersonaExtractionConfig, extract_persona_facts};
465
466 let cfg = &self.services.memory.extraction.persona_config;
467 if !cfg.enabled {
468 return;
469 }
470
471 let Some(memory) = &self.services.memory.persistence.memory else {
472 return;
473 };
474
475 let user_messages: Vec<String> = self
476 .msg
477 .messages
478 .iter()
479 .filter(|m| {
480 m.role == Role::User
481 && !m
482 .parts
483 .iter()
484 .any(|p| matches!(p, MessagePart::ToolResult { .. }))
485 })
486 .take(8)
487 .map(|m| {
488 if m.content.len() > 2048 {
489 m.content[..m.content.floor_char_boundary(2048)].to_owned()
490 } else {
491 m.content.clone()
492 }
493 })
494 .collect();
495
496 if user_messages.len() < cfg.min_messages {
497 return;
498 }
499
500 let timeout_secs = cfg.extraction_timeout_secs;
501 let extraction_cfg = PersonaExtractionConfig {
502 enabled: cfg.enabled,
503 min_messages: cfg.min_messages,
504 max_messages: cfg.max_messages,
505 extraction_timeout_secs: timeout_secs,
506 };
507
508 let provider = self.resolve_background_provider(cfg.persona_provider.as_str());
509 let store = memory.sqlite().clone();
510 let conversation_id = self
511 .services
512 .memory
513 .persistence
514 .conversation_id
515 .map(|c| c.0);
516
517 self.runtime.lifecycle.supervisor.spawn(
518 super::agent_supervisor::TaskClass::Enrichment,
519 "persona_extraction",
520 async move {
521 let user_message_refs: Vec<&str> =
522 user_messages.iter().map(String::as_str).collect();
523 let fut = extract_persona_facts(
524 &store,
525 &provider,
526 &user_message_refs,
527 &extraction_cfg,
528 conversation_id,
529 );
530 match tokio::time::timeout(std::time::Duration::from_secs(timeout_secs), fut).await
531 {
532 Ok(Ok(n)) => tracing::debug!(upserted = n, "persona extraction complete"),
533 Ok(Err(e)) => tracing::warn!(error = %e, "persona extraction failed"),
534 Err(_) => tracing::warn!(
535 timeout_secs,
536 "persona extraction timed out — no facts written this turn"
537 ),
538 }
539 },
540 );
541 }
542
543 fn enqueue_trajectory_extraction_task(&mut self) {
545 use zeph_memory::semantic::{TrajectoryExtractionConfig, extract_trajectory_entries};
546
547 let cfg = self.services.memory.extraction.trajectory_config.clone();
548 if !cfg.enabled {
549 return;
550 }
551
552 let Some(memory) = &self.services.memory.persistence.memory else {
553 return;
554 };
555
556 let conversation_id = match self.services.memory.persistence.conversation_id {
557 Some(cid) => cid.0,
558 None => return,
559 };
560
561 let tail_start = self.msg.messages.len().saturating_sub(cfg.max_messages);
562 let turn_messages: Vec<zeph_llm::provider::Message> =
563 self.msg.messages[tail_start..].to_vec();
564
565 if turn_messages.is_empty() {
566 return;
567 }
568
569 let extraction_cfg = TrajectoryExtractionConfig {
570 enabled: cfg.enabled,
571 max_messages: cfg.max_messages,
572 extraction_timeout_secs: cfg.extraction_timeout_secs,
573 };
574
575 let provider = self.resolve_background_provider(cfg.trajectory_provider.as_str());
576 let store = memory.sqlite().clone();
577 let min_confidence = cfg.min_confidence;
578
579 self.runtime.lifecycle.supervisor.spawn(
580 super::agent_supervisor::TaskClass::Enrichment,
581 "trajectory_extraction",
582 async move {
583 let entries =
584 match extract_trajectory_entries(&provider, &turn_messages, &extraction_cfg)
585 .await
586 {
587 Ok(e) => e,
588 Err(e) => {
589 tracing::warn!(error = %e, "trajectory extraction failed");
590 return;
591 }
592 };
593
594 let last_id = store
595 .trajectory_last_extracted_message_id(conversation_id)
596 .await
597 .unwrap_or(0);
598
599 let mut max_id = last_id;
600 for entry in &entries {
601 if entry.confidence < min_confidence {
602 continue;
603 }
604 let tools_json = serde_json::to_string(&entry.tools_used)
605 .unwrap_or_else(|_| "[]".to_string());
606 match store
607 .insert_trajectory_entry(zeph_memory::NewTrajectoryEntry {
608 conversation_id: Some(conversation_id),
609 turn_index: 0,
610 kind: &entry.kind,
611 intent: &entry.intent,
612 outcome: &entry.outcome,
613 tools_used: &tools_json,
614 confidence: entry.confidence,
615 })
616 .await
617 {
618 Ok(id) => {
619 if id > max_id {
620 max_id = id;
621 }
622 }
623 Err(e) => tracing::warn!(error = %e, "failed to insert trajectory entry"),
624 }
625 }
626
627 if max_id > last_id {
628 let _ = store
629 .set_trajectory_last_extracted_message_id(conversation_id, max_id)
630 .await;
631 }
632
633 tracing::debug!(
634 count = entries.len(),
635 conversation_id,
636 "trajectory extraction complete"
637 );
638 },
639 );
640 }
641
642 fn enqueue_reasoning_extraction_task(&mut self) {
647 let cfg = self.services.memory.extraction.reasoning_config.clone();
648 if !cfg.enabled {
649 return;
650 }
651
652 let Some(memory) = &self.services.memory.persistence.memory else {
653 return;
654 };
655
656 let Some(reasoning) = memory.reasoning.clone() else {
657 return;
658 };
659
660 let tail_start = self.msg.messages.len().saturating_sub(cfg.max_messages);
661 let turn_messages: Vec<zeph_llm::provider::Message> =
662 self.msg.messages[tail_start..].to_vec();
663
664 if turn_messages.len() < cfg.min_messages {
665 return;
666 }
667
668 let extract_provider = self.resolve_background_provider(cfg.extract_provider.as_str());
669 let distill_provider = self.resolve_background_provider(cfg.distill_provider.as_str());
670 let embed_provider = memory.effective_embed_provider().clone();
671 let store_limit = cfg.store_limit;
672 let extraction_timeout = std::time::Duration::from_secs(cfg.extraction_timeout_secs);
673 let distill_timeout = std::time::Duration::from_secs(cfg.distill_timeout_secs);
674 let self_judge_window = cfg.self_judge_window;
675 let min_assistant_chars = cfg.min_assistant_chars;
676
677 self.runtime.lifecycle.supervisor.spawn(
678 super::agent_supervisor::TaskClass::Enrichment,
679 "reasoning_extraction",
680 async move {
681 if let Err(e) = zeph_memory::process_reasoning_turn(
682 &reasoning,
683 &extract_provider,
684 &distill_provider,
685 &embed_provider,
686 &turn_messages,
687 zeph_memory::ProcessTurnConfig {
688 store_limit,
689 extraction_timeout,
690 distill_timeout,
691 self_judge_window,
692 min_assistant_chars,
693 },
694 )
695 .await
696 {
697 tracing::warn!(error = %e, "reasoning: process_turn failed");
698 }
699
700 tracing::debug!("reasoning extraction complete");
701 },
702 );
703 }
704
705 async fn rpe_should_skip(&mut self, content: &str) -> bool {
710 let Some(ref rpe_mutex) = self.services.memory.extraction.rpe_router else {
711 return false;
712 };
713 let Some(memory) = &self.services.memory.persistence.memory else {
714 return false;
715 };
716 let candidates = zeph_memory::extract_candidate_entities(content);
717 let provider = memory.provider();
718 let Ok(Ok(emb_vec)) =
719 tokio::time::timeout(std::time::Duration::from_secs(5), provider.embed(content)).await
720 else {
721 return false; };
723 if let Ok(mut router) = rpe_mutex.lock() {
724 let signal = router.compute(&emb_vec, &candidates);
725 router.push_embedding(emb_vec);
726 router.push_entities(&candidates);
727 !signal.should_extract
728 } else {
729 tracing::warn!("rpe_router mutex poisoned; falling through to extract");
730 false
731 }
732 }
733}
734
735#[cfg(test)]
736mod tests {
737 use super::super::agent_tests::{
738 MetricsSnapshot, MockChannel, MockToolExecutor, create_test_registry, mock_provider,
739 };
740 use super::*;
741 use zeph_llm::any::AnyProvider;
742 use zeph_llm::provider::Message;
743 use zeph_memory::semantic::SemanticMemory;
744
745 async fn test_memory(provider: &AnyProvider) -> SemanticMemory {
746 SemanticMemory::new(
747 ":memory:",
748 "http://127.0.0.1:1",
749 None,
750 provider.clone(),
751 "test-model",
752 )
753 .await
754 .unwrap()
755 }
756
757 #[tokio::test]
758 async fn load_history_without_memory_returns_ok() {
759 let provider = mock_provider(vec![]);
760 let channel = MockChannel::new(vec![]);
761 let registry = create_test_registry();
762 let executor = MockToolExecutor::no_tools();
763 let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
764
765 let result = agent.load_history().await;
766 assert!(result.is_ok());
767 assert_eq!(agent.msg.messages.len(), 1); }
770
771 #[tokio::test]
772 async fn load_history_with_messages_injects_into_agent() {
773 let provider = mock_provider(vec![]);
774 let channel = MockChannel::new(vec![]);
775 let registry = create_test_registry();
776 let executor = MockToolExecutor::no_tools();
777
778 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
779 let cid = memory.sqlite().create_conversation().await.unwrap();
780
781 memory
782 .sqlite()
783 .save_message(cid, "user", "hello from history")
784 .await
785 .unwrap();
786 memory
787 .sqlite()
788 .save_message(cid, "assistant", "hi back")
789 .await
790 .unwrap();
791
792 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
793 std::sync::Arc::new(memory),
794 cid,
795 50,
796 5,
797 100,
798 );
799
800 let messages_before = agent.msg.messages.len();
801 agent.load_history().await.unwrap();
802 assert_eq!(agent.msg.messages.len(), messages_before + 2);
804 }
805
806 #[tokio::test]
807 async fn load_history_skips_empty_messages() {
808 let provider = mock_provider(vec![]);
809 let channel = MockChannel::new(vec![]);
810 let registry = create_test_registry();
811 let executor = MockToolExecutor::no_tools();
812
813 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
814 let cid = memory.sqlite().create_conversation().await.unwrap();
815
816 memory
818 .sqlite()
819 .save_message(cid, "user", " ")
820 .await
821 .unwrap();
822 memory
823 .sqlite()
824 .save_message(cid, "user", "real message")
825 .await
826 .unwrap();
827
828 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
829 std::sync::Arc::new(memory),
830 cid,
831 50,
832 5,
833 100,
834 );
835
836 let messages_before = agent.msg.messages.len();
837 agent.load_history().await.unwrap();
838 assert_eq!(agent.msg.messages.len(), messages_before + 1);
840 }
841
842 #[tokio::test]
843 async fn load_history_with_empty_store_returns_ok() {
844 let provider = mock_provider(vec![]);
845 let channel = MockChannel::new(vec![]);
846 let registry = create_test_registry();
847 let executor = MockToolExecutor::no_tools();
848
849 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
850 let cid = memory.sqlite().create_conversation().await.unwrap();
851
852 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
853 std::sync::Arc::new(memory),
854 cid,
855 50,
856 5,
857 100,
858 );
859
860 let messages_before = agent.msg.messages.len();
861 agent.load_history().await.unwrap();
862 assert_eq!(agent.msg.messages.len(), messages_before);
864 }
865
866 #[tokio::test]
867 async fn load_history_increments_session_count_for_existing_messages() {
868 let provider = mock_provider(vec![]);
869 let channel = MockChannel::new(vec![]);
870 let registry = create_test_registry();
871 let executor = MockToolExecutor::no_tools();
872
873 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
874 let cid = memory.sqlite().create_conversation().await.unwrap();
875
876 let id1 = memory
878 .sqlite()
879 .save_message(cid, "user", "hello")
880 .await
881 .unwrap();
882 let id2 = memory
883 .sqlite()
884 .save_message(cid, "assistant", "hi")
885 .await
886 .unwrap();
887
888 let memory_arc = std::sync::Arc::new(memory);
889 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
890 memory_arc.clone(),
891 cid,
892 50,
893 5,
894 100,
895 );
896
897 agent.load_history().await.unwrap();
898
899 let counts: Vec<i64> = zeph_db::query_scalar(
901 "SELECT session_count FROM messages WHERE id IN (?, ?) ORDER BY id",
902 )
903 .bind(id1)
904 .bind(id2)
905 .fetch_all(memory_arc.sqlite().pool())
906 .await
907 .unwrap();
908 assert_eq!(
909 counts,
910 vec![1, 1],
911 "session_count must be 1 after first restore"
912 );
913 }
914
915 #[tokio::test]
916 async fn load_history_does_not_increment_session_count_for_new_conversation() {
917 let provider = mock_provider(vec![]);
918 let channel = MockChannel::new(vec![]);
919 let registry = create_test_registry();
920 let executor = MockToolExecutor::no_tools();
921
922 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
923 let cid = memory.sqlite().create_conversation().await.unwrap();
924
925 let memory_arc = std::sync::Arc::new(memory);
927 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
928 memory_arc.clone(),
929 cid,
930 50,
931 5,
932 100,
933 );
934
935 agent.load_history().await.unwrap();
936
937 let counts: Vec<i64> =
939 zeph_db::query_scalar("SELECT session_count FROM messages WHERE conversation_id = ?")
940 .bind(cid)
941 .fetch_all(memory_arc.sqlite().pool())
942 .await
943 .unwrap();
944 assert!(counts.is_empty(), "new conversation must have no messages");
945 }
946
947 #[tokio::test]
948 async fn persist_message_without_memory_silently_returns() {
949 let provider = mock_provider(vec![]);
951 let channel = MockChannel::new(vec![]);
952 let registry = create_test_registry();
953 let executor = MockToolExecutor::no_tools();
954 let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
955
956 agent.persist_message(Role::User, "hello", &[], false).await;
958 }
959
960 #[tokio::test]
961 async fn persist_message_assistant_autosave_false_uses_save_only() {
962 let provider = mock_provider(vec![]);
963 let channel = MockChannel::new(vec![]);
964 let registry = create_test_registry();
965 let executor = MockToolExecutor::no_tools();
966
967 let (tx, rx) = tokio::sync::watch::channel(MetricsSnapshot::default());
968 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
969 let cid = memory.sqlite().create_conversation().await.unwrap();
970
971 let mut agent = Agent::new(provider, channel, registry, None, 5, executor)
972 .with_metrics(tx)
973 .with_memory(std::sync::Arc::new(memory), cid, 50, 5, 100);
974 agent.services.memory.persistence.autosave_assistant = false;
975 agent.services.memory.persistence.autosave_min_length = 20;
976
977 agent
978 .persist_message(Role::Assistant, "short assistant reply", &[], false)
979 .await;
980
981 let history = agent
982 .services
983 .memory
984 .persistence
985 .memory
986 .as_ref()
987 .unwrap()
988 .sqlite()
989 .load_history(cid, 50)
990 .await
991 .unwrap();
992 assert_eq!(history.len(), 1, "message must be saved");
993 assert_eq!(history[0].content, "short assistant reply");
994 assert_eq!(rx.borrow().embeddings_generated, 0);
996 }
997
998 #[tokio::test]
999 async fn persist_message_assistant_below_min_length_uses_save_only() {
1000 let provider = mock_provider(vec![]);
1001 let channel = MockChannel::new(vec![]);
1002 let registry = create_test_registry();
1003 let executor = MockToolExecutor::no_tools();
1004
1005 let (tx, rx) = tokio::sync::watch::channel(MetricsSnapshot::default());
1006 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1007 let cid = memory.sqlite().create_conversation().await.unwrap();
1008
1009 let mut agent = Agent::new(provider, channel, registry, None, 5, executor)
1011 .with_metrics(tx)
1012 .with_memory(std::sync::Arc::new(memory), cid, 50, 5, 100);
1013 agent.services.memory.persistence.autosave_assistant = true;
1014 agent.services.memory.persistence.autosave_min_length = 1000;
1015
1016 agent
1017 .persist_message(Role::Assistant, "too short", &[], false)
1018 .await;
1019
1020 let history = agent
1021 .services
1022 .memory
1023 .persistence
1024 .memory
1025 .as_ref()
1026 .unwrap()
1027 .sqlite()
1028 .load_history(cid, 50)
1029 .await
1030 .unwrap();
1031 assert_eq!(history.len(), 1, "message must be saved");
1032 assert_eq!(history[0].content, "too short");
1033 assert_eq!(rx.borrow().embeddings_generated, 0);
1034 }
1035
1036 #[tokio::test]
1037 async fn persist_message_assistant_at_min_length_boundary_uses_embed() {
1038 let provider = mock_provider(vec![]);
1040 let channel = MockChannel::new(vec![]);
1041 let registry = create_test_registry();
1042 let executor = MockToolExecutor::no_tools();
1043
1044 let (tx, rx) = tokio::sync::watch::channel(MetricsSnapshot::default());
1045 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1046 let cid = memory.sqlite().create_conversation().await.unwrap();
1047
1048 let min_length = 10usize;
1049 let mut agent = Agent::new(provider, channel, registry, None, 5, executor)
1050 .with_metrics(tx)
1051 .with_memory(std::sync::Arc::new(memory), cid, 50, 5, 100);
1052 agent.services.memory.persistence.autosave_assistant = true;
1053 agent.services.memory.persistence.autosave_min_length = min_length;
1054
1055 let content_at_boundary = "A".repeat(min_length);
1057 assert_eq!(content_at_boundary.len(), min_length);
1058 agent
1059 .persist_message(Role::Assistant, &content_at_boundary, &[], false)
1060 .await;
1061
1062 assert_eq!(rx.borrow().sqlite_message_count, 1);
1064 }
1065
1066 #[tokio::test]
1067 async fn persist_message_assistant_one_below_min_length_uses_save_only() {
1068 let provider = mock_provider(vec![]);
1070 let channel = MockChannel::new(vec![]);
1071 let registry = create_test_registry();
1072 let executor = MockToolExecutor::no_tools();
1073
1074 let (tx, rx) = tokio::sync::watch::channel(MetricsSnapshot::default());
1075 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1076 let cid = memory.sqlite().create_conversation().await.unwrap();
1077
1078 let min_length = 10usize;
1079 let mut agent = Agent::new(provider, channel, registry, None, 5, executor)
1080 .with_metrics(tx)
1081 .with_memory(std::sync::Arc::new(memory), cid, 50, 5, 100);
1082 agent.services.memory.persistence.autosave_assistant = true;
1083 agent.services.memory.persistence.autosave_min_length = min_length;
1084
1085 let content_below_boundary = "A".repeat(min_length - 1);
1087 assert_eq!(content_below_boundary.len(), min_length - 1);
1088 agent
1089 .persist_message(Role::Assistant, &content_below_boundary, &[], false)
1090 .await;
1091
1092 let history = agent
1093 .services
1094 .memory
1095 .persistence
1096 .memory
1097 .as_ref()
1098 .unwrap()
1099 .sqlite()
1100 .load_history(cid, 50)
1101 .await
1102 .unwrap();
1103 assert_eq!(history.len(), 1, "message must still be saved");
1104 assert_eq!(rx.borrow().embeddings_generated, 0);
1106 }
1107
1108 #[tokio::test]
1109 async fn persist_message_increments_unsummarized_count() {
1110 let provider = mock_provider(vec![]);
1111 let channel = MockChannel::new(vec![]);
1112 let registry = create_test_registry();
1113 let executor = MockToolExecutor::no_tools();
1114
1115 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1116 let cid = memory.sqlite().create_conversation().await.unwrap();
1117
1118 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
1120 std::sync::Arc::new(memory),
1121 cid,
1122 50,
1123 5,
1124 100,
1125 );
1126
1127 assert_eq!(agent.services.memory.persistence.unsummarized_count, 0);
1128
1129 agent.persist_message(Role::User, "first", &[], false).await;
1130 assert_eq!(agent.services.memory.persistence.unsummarized_count, 1);
1131
1132 agent
1133 .persist_message(Role::User, "second", &[], false)
1134 .await;
1135 assert_eq!(agent.services.memory.persistence.unsummarized_count, 2);
1136 }
1137
1138 #[tokio::test]
1139 async fn check_summarization_resets_counter_on_success() {
1140 let provider = mock_provider(vec![]);
1141 let channel = MockChannel::new(vec![]);
1142 let registry = create_test_registry();
1143 let executor = MockToolExecutor::no_tools();
1144
1145 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1146 let cid = memory.sqlite().create_conversation().await.unwrap();
1147
1148 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
1150 std::sync::Arc::new(memory),
1151 cid,
1152 50,
1153 5,
1154 1,
1155 );
1156
1157 agent.persist_message(Role::User, "msg1", &[], false).await;
1158 agent.persist_message(Role::User, "msg2", &[], false).await;
1159
1160 assert!(agent.services.memory.persistence.unsummarized_count <= 2);
1165 }
1166
1167 #[tokio::test]
1168 async fn unsummarized_count_not_incremented_without_memory() {
1169 let provider = mock_provider(vec![]);
1170 let channel = MockChannel::new(vec![]);
1171 let registry = create_test_registry();
1172 let executor = MockToolExecutor::no_tools();
1173 let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
1174
1175 agent.persist_message(Role::User, "hello", &[], false).await;
1176 assert_eq!(agent.services.memory.persistence.unsummarized_count, 0);
1178 }
1179
1180 mod graph_extraction_guards {
1182 use super::*;
1183 use crate::config::GraphConfig;
1184 use zeph_llm::provider::MessageMetadata;
1185 use zeph_memory::graph::GraphStore;
1186
1187 fn enabled_graph_config() -> GraphConfig {
1188 GraphConfig {
1189 enabled: true,
1190 ..GraphConfig::default()
1191 }
1192 }
1193
1194 async fn agent_with_graph(
1195 provider: &AnyProvider,
1196 config: GraphConfig,
1197 ) -> Agent<MockChannel> {
1198 let memory =
1199 test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1200 let cid = memory.sqlite().create_conversation().await.unwrap();
1201 Agent::new(
1202 provider.clone(),
1203 MockChannel::new(vec![]),
1204 create_test_registry(),
1205 None,
1206 5,
1207 MockToolExecutor::no_tools(),
1208 )
1209 .with_memory(std::sync::Arc::new(memory), cid, 50, 5, 100)
1210 .with_graph_config(config)
1211 }
1212
1213 #[tokio::test]
1214 async fn injection_flag_guard_skips_extraction() {
1215 let provider = mock_provider(vec![]);
1217 let mut agent = agent_with_graph(&provider, enabled_graph_config()).await;
1218 let pool = agent
1219 .services
1220 .memory
1221 .persistence
1222 .memory
1223 .as_ref()
1224 .unwrap()
1225 .sqlite()
1226 .pool()
1227 .clone();
1228
1229 agent
1230 .enqueue_graph_extraction_task("I use Rust", true, false)
1231 .await;
1232
1233 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1235
1236 let store = GraphStore::new(pool);
1237 let count = store.get_metadata("extraction_count").await.unwrap();
1238 assert!(
1239 count.is_none(),
1240 "injection flag must prevent extraction_count from being written"
1241 );
1242 }
1243
1244 #[tokio::test]
1245 async fn disabled_config_guard_skips_extraction() {
1246 let provider = mock_provider(vec![]);
1248 let disabled_cfg = GraphConfig {
1249 enabled: false,
1250 ..GraphConfig::default()
1251 };
1252 let mut agent = agent_with_graph(&provider, disabled_cfg).await;
1253 let pool = agent
1254 .services
1255 .memory
1256 .persistence
1257 .memory
1258 .as_ref()
1259 .unwrap()
1260 .sqlite()
1261 .pool()
1262 .clone();
1263
1264 agent
1265 .enqueue_graph_extraction_task("I use Rust", false, false)
1266 .await;
1267
1268 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1269
1270 let store = GraphStore::new(pool);
1271 let count = store.get_metadata("extraction_count").await.unwrap();
1272 assert!(
1273 count.is_none(),
1274 "disabled graph config must prevent extraction"
1275 );
1276 }
1277
1278 #[tokio::test]
1279 async fn happy_path_fires_extraction() {
1280 let provider = mock_provider(vec![]);
1283 let mut agent = agent_with_graph(&provider, enabled_graph_config()).await;
1284 let pool = agent
1285 .services
1286 .memory
1287 .persistence
1288 .memory
1289 .as_ref()
1290 .unwrap()
1291 .sqlite()
1292 .pool()
1293 .clone();
1294
1295 agent
1296 .enqueue_graph_extraction_task("I use Rust for systems programming", false, false)
1297 .await;
1298
1299 tokio::time::sleep(std::time::Duration::from_millis(200)).await;
1301
1302 let store = GraphStore::new(pool);
1303 let count = store.get_metadata("extraction_count").await.unwrap();
1304 assert!(
1305 count.is_some(),
1306 "happy-path extraction must increment extraction_count"
1307 );
1308 }
1309
1310 #[tokio::test]
1311 async fn tool_result_parts_guard_skips_extraction() {
1312 let provider = mock_provider(vec![]);
1316 let mut agent = agent_with_graph(&provider, enabled_graph_config()).await;
1317 let pool = agent
1318 .services
1319 .memory
1320 .persistence
1321 .memory
1322 .as_ref()
1323 .unwrap()
1324 .sqlite()
1325 .pool()
1326 .clone();
1327
1328 agent
1329 .enqueue_graph_extraction_task(
1330 "[tool_result: abc123]\nprovider_type = \"claude\"\nallowed_commands = []",
1331 false,
1332 true, )
1334 .await;
1335
1336 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1337
1338 let store = GraphStore::new(pool);
1339 let count = store.get_metadata("extraction_count").await.unwrap();
1340 assert!(
1341 count.is_none(),
1342 "tool result message must not trigger graph extraction"
1343 );
1344 }
1345
1346 #[tokio::test]
1347 async fn context_filter_excludes_tool_result_messages() {
1348 let provider = mock_provider(vec![]);
1359 let mut agent = agent_with_graph(&provider, enabled_graph_config()).await;
1360
1361 agent.msg.messages.push(Message {
1364 role: Role::User,
1365 content: "[tool_result: abc]\nprovider_type = \"openai\"".to_owned(),
1366 parts: vec![MessagePart::ToolResult {
1367 tool_use_id: "abc".to_owned(),
1368 content: "provider_type = \"openai\"".to_owned(),
1369 is_error: false,
1370 }],
1371 metadata: MessageMetadata::default(),
1372 });
1373
1374 let pool = agent
1375 .services
1376 .memory
1377 .persistence
1378 .memory
1379 .as_ref()
1380 .unwrap()
1381 .sqlite()
1382 .pool()
1383 .clone();
1384
1385 agent
1387 .enqueue_graph_extraction_task(
1388 "I prefer Rust for systems programming",
1389 false,
1390 false,
1391 )
1392 .await;
1393
1394 tokio::time::sleep(std::time::Duration::from_millis(200)).await;
1395
1396 let store = GraphStore::new(pool);
1398 let count = store.get_metadata("extraction_count").await.unwrap();
1399 assert!(
1400 count.is_some(),
1401 "conversational message must trigger extraction even with prior tool result in history"
1402 );
1403 }
1404 }
1405
1406 mod persona_extraction_guards {
1408 use super::*;
1409 use zeph_config::PersonaConfig;
1410 use zeph_llm::provider::MessageMetadata;
1411
1412 fn enabled_persona_config() -> PersonaConfig {
1413 PersonaConfig {
1414 enabled: true,
1415 min_messages: 1,
1416 ..PersonaConfig::default()
1417 }
1418 }
1419
1420 async fn agent_with_persona(
1421 provider: &AnyProvider,
1422 config: PersonaConfig,
1423 ) -> Agent<MockChannel> {
1424 let memory =
1425 test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1426 let cid = memory.sqlite().create_conversation().await.unwrap();
1427 let mut agent = Agent::new(
1428 provider.clone(),
1429 MockChannel::new(vec![]),
1430 create_test_registry(),
1431 None,
1432 5,
1433 MockToolExecutor::no_tools(),
1434 )
1435 .with_memory(std::sync::Arc::new(memory), cid, 50, 5, 100);
1436 agent.services.memory.extraction.persona_config = config;
1437 agent
1438 }
1439
1440 #[tokio::test]
1441 async fn disabled_config_skips_spawn() {
1442 let provider = mock_provider(vec![]);
1444 let mut agent = agent_with_persona(
1445 &provider,
1446 PersonaConfig {
1447 enabled: false,
1448 ..PersonaConfig::default()
1449 },
1450 )
1451 .await;
1452
1453 agent.msg.messages.push(zeph_llm::provider::Message {
1455 role: Role::User,
1456 content: "I prefer Rust for systems programming".to_owned(),
1457 parts: vec![],
1458 metadata: MessageMetadata::default(),
1459 });
1460
1461 agent.enqueue_persona_extraction_task();
1462
1463 let store = agent
1464 .services
1465 .memory
1466 .persistence
1467 .memory
1468 .as_ref()
1469 .unwrap()
1470 .sqlite()
1471 .clone();
1472 let count = store.count_persona_facts().await.unwrap();
1473 assert_eq!(count, 0, "disabled persona config must not write any facts");
1474 }
1475
1476 #[tokio::test]
1477 async fn below_min_messages_skips_spawn() {
1478 let provider = mock_provider(vec![]);
1480 let mut agent = agent_with_persona(
1481 &provider,
1482 PersonaConfig {
1483 enabled: true,
1484 min_messages: 3,
1485 ..PersonaConfig::default()
1486 },
1487 )
1488 .await;
1489
1490 for text in ["I use Rust", "I prefer async code"] {
1491 agent.msg.messages.push(zeph_llm::provider::Message {
1492 role: Role::User,
1493 content: text.to_owned(),
1494 parts: vec![],
1495 metadata: MessageMetadata::default(),
1496 });
1497 }
1498
1499 agent.enqueue_persona_extraction_task();
1500
1501 let store = agent
1502 .services
1503 .memory
1504 .persistence
1505 .memory
1506 .as_ref()
1507 .unwrap()
1508 .sqlite()
1509 .clone();
1510 let count = store.count_persona_facts().await.unwrap();
1511 assert_eq!(
1512 count, 0,
1513 "below min_messages threshold must not trigger extraction"
1514 );
1515 }
1516
1517 #[tokio::test]
1518 async fn no_memory_skips_spawn() {
1519 let provider = mock_provider(vec![]);
1521 let channel = MockChannel::new(vec![]);
1522 let registry = create_test_registry();
1523 let executor = MockToolExecutor::no_tools();
1524 let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
1525 agent.services.memory.extraction.persona_config = enabled_persona_config();
1526 agent.msg.messages.push(zeph_llm::provider::Message {
1527 role: Role::User,
1528 content: "I like Rust".to_owned(),
1529 parts: vec![],
1530 metadata: MessageMetadata::default(),
1531 });
1532
1533 agent.enqueue_persona_extraction_task();
1535 }
1536
1537 #[tokio::test]
1538 async fn enabled_enough_messages_spawns_extraction() {
1539 use zeph_llm::mock::MockProvider;
1542 let (mock, recorded) = MockProvider::default().with_recording();
1543 let provider = AnyProvider::Mock(mock);
1544 let mut agent = agent_with_persona(&provider, enabled_persona_config()).await;
1545
1546 agent.msg.messages.push(zeph_llm::provider::Message {
1547 role: Role::User,
1548 content: "I prefer Rust for systems programming".to_owned(),
1549 parts: vec![],
1550 metadata: MessageMetadata::default(),
1551 });
1552
1553 agent.enqueue_persona_extraction_task();
1554
1555 agent.runtime.lifecycle.supervisor.join_all_for_test().await;
1557
1558 let calls = recorded.lock().unwrap();
1559 assert!(
1560 !calls.is_empty(),
1561 "happy-path: provider.chat() must be called when extraction completes"
1562 );
1563 }
1564
1565 #[tokio::test]
1566 async fn messages_capped_at_eight() {
1567 use zeph_llm::mock::MockProvider;
1570 let (mock, recorded) = MockProvider::default().with_recording();
1571 let provider = AnyProvider::Mock(mock);
1572 let mut agent = agent_with_persona(&provider, enabled_persona_config()).await;
1573
1574 for i in 0..12u32 {
1575 agent.msg.messages.push(zeph_llm::provider::Message {
1576 role: Role::User,
1577 content: format!("I like message {i}"),
1578 parts: vec![],
1579 metadata: MessageMetadata::default(),
1580 });
1581 }
1582
1583 agent.enqueue_persona_extraction_task();
1584
1585 agent.runtime.lifecycle.supervisor.join_all_for_test().await;
1587
1588 let calls = recorded.lock().unwrap();
1590 assert!(
1591 !calls.is_empty(),
1592 "extraction must run when enough messages present"
1593 );
1594 let prompt = &calls[0];
1596 let user_text = prompt
1597 .iter()
1598 .filter(|m| m.role == Role::User)
1599 .map(|m| m.content.as_str())
1600 .collect::<Vec<_>>()
1601 .join(" ");
1602 assert!(
1604 !user_text.contains("I like message 8"),
1605 "message index 8 must be excluded from extraction input"
1606 );
1607 }
1608
1609 #[test]
1610 fn long_message_truncated_at_char_boundary() {
1611 let long_content = "x".repeat(3000);
1615 let truncated = if long_content.len() > 2048 {
1616 long_content[..long_content.floor_char_boundary(2048)].to_owned()
1617 } else {
1618 long_content.clone()
1619 };
1620 assert_eq!(
1621 truncated.len(),
1622 2048,
1623 "ASCII content must be truncated to exactly 2048 bytes"
1624 );
1625
1626 let multi = "é".repeat(1500); let truncated_multi = if multi.len() > 2048 {
1629 multi[..multi.floor_char_boundary(2048)].to_owned()
1630 } else {
1631 multi.clone()
1632 };
1633 assert!(
1634 truncated_multi.len() <= 2048,
1635 "multi-byte content must not exceed 2048 bytes"
1636 );
1637 assert!(truncated_multi.is_char_boundary(truncated_multi.len()));
1638 }
1639 }
1640
1641 #[tokio::test]
1642 async fn persist_message_user_always_embeds_regardless_of_autosave_flag() {
1643 let provider = mock_provider(vec![]);
1644 let channel = MockChannel::new(vec![]);
1645 let registry = create_test_registry();
1646 let executor = MockToolExecutor::no_tools();
1647
1648 let (tx, rx) = tokio::sync::watch::channel(MetricsSnapshot::default());
1649 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1650 let cid = memory.sqlite().create_conversation().await.unwrap();
1651
1652 let mut agent = Agent::new(provider, channel, registry, None, 5, executor)
1654 .with_metrics(tx)
1655 .with_memory(std::sync::Arc::new(memory), cid, 50, 5, 100);
1656 agent.services.memory.persistence.autosave_assistant = false;
1657 agent.services.memory.persistence.autosave_min_length = 20;
1658
1659 let long_user_msg = "A".repeat(100);
1660 agent
1661 .persist_message(Role::User, &long_user_msg, &[], false)
1662 .await;
1663
1664 let history = agent
1665 .services
1666 .memory
1667 .persistence
1668 .memory
1669 .as_ref()
1670 .unwrap()
1671 .sqlite()
1672 .load_history(cid, 50)
1673 .await
1674 .unwrap();
1675 assert_eq!(history.len(), 1, "user message must be saved");
1676 assert_eq!(rx.borrow().sqlite_message_count, 1);
1679 }
1680
1681 #[tokio::test]
1685 async fn persist_message_saves_correct_tool_use_parts() {
1686 use zeph_llm::provider::MessagePart;
1687
1688 let provider = mock_provider(vec![]);
1689 let channel = MockChannel::new(vec![]);
1690 let registry = create_test_registry();
1691 let executor = MockToolExecutor::no_tools();
1692
1693 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1694 let cid = memory.sqlite().create_conversation().await.unwrap();
1695
1696 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
1697 std::sync::Arc::new(memory),
1698 cid,
1699 50,
1700 5,
1701 100,
1702 );
1703
1704 let parts = vec![MessagePart::ToolUse {
1705 id: "call_abc123".to_string(),
1706 name: "read_file".to_string(),
1707 input: serde_json::json!({"path": "/tmp/test.txt"}),
1708 }];
1709 let content = "[tool_use: read_file(call_abc123)]";
1710
1711 agent
1712 .persist_message(Role::Assistant, content, &parts, false)
1713 .await;
1714
1715 let history = agent
1716 .services
1717 .memory
1718 .persistence
1719 .memory
1720 .as_ref()
1721 .unwrap()
1722 .sqlite()
1723 .load_history(cid, 50)
1724 .await
1725 .unwrap();
1726
1727 assert_eq!(history.len(), 1);
1728 assert_eq!(history[0].role, Role::Assistant);
1729 assert_eq!(history[0].content, content);
1730 assert_eq!(history[0].parts.len(), 1);
1731 match &history[0].parts[0] {
1732 MessagePart::ToolUse { id, name, .. } => {
1733 assert_eq!(id, "call_abc123");
1734 assert_eq!(name, "read_file");
1735 }
1736 other => panic!("expected ToolUse part, got {other:?}"),
1737 }
1738 assert!(
1740 !history[0]
1741 .parts
1742 .iter()
1743 .any(|p| matches!(p, MessagePart::ToolResult { .. })),
1744 "assistant message must not contain ToolResult parts"
1745 );
1746 }
1747
1748 #[tokio::test]
1749 async fn persist_message_saves_correct_tool_result_parts() {
1750 use zeph_llm::provider::MessagePart;
1751
1752 let provider = mock_provider(vec![]);
1753 let channel = MockChannel::new(vec![]);
1754 let registry = create_test_registry();
1755 let executor = MockToolExecutor::no_tools();
1756
1757 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1758 let cid = memory.sqlite().create_conversation().await.unwrap();
1759
1760 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
1761 std::sync::Arc::new(memory),
1762 cid,
1763 50,
1764 5,
1765 100,
1766 );
1767
1768 let parts = vec![MessagePart::ToolResult {
1769 tool_use_id: "call_abc123".to_string(),
1770 content: "file contents here".to_string(),
1771 is_error: false,
1772 }];
1773 let content = "[tool_result: call_abc123]\nfile contents here";
1774
1775 agent
1776 .persist_message(Role::User, content, &parts, false)
1777 .await;
1778
1779 let history = agent
1780 .services
1781 .memory
1782 .persistence
1783 .memory
1784 .as_ref()
1785 .unwrap()
1786 .sqlite()
1787 .load_history(cid, 50)
1788 .await
1789 .unwrap();
1790
1791 assert_eq!(history.len(), 1);
1792 assert_eq!(history[0].role, Role::User);
1793 assert_eq!(history[0].content, content);
1794 assert_eq!(history[0].parts.len(), 1);
1795 match &history[0].parts[0] {
1796 MessagePart::ToolResult {
1797 tool_use_id,
1798 content: result_content,
1799 is_error,
1800 } => {
1801 assert_eq!(tool_use_id, "call_abc123");
1802 assert_eq!(result_content, "file contents here");
1803 assert!(!is_error);
1804 }
1805 other => panic!("expected ToolResult part, got {other:?}"),
1806 }
1807 assert!(
1809 !history[0]
1810 .parts
1811 .iter()
1812 .any(|p| matches!(p, MessagePart::ToolUse { .. })),
1813 "user ToolResult message must not contain ToolUse parts"
1814 );
1815 }
1816
1817 #[tokio::test]
1818 async fn persist_message_roundtrip_preserves_role_part_alignment() {
1819 use zeph_llm::provider::MessagePart;
1820
1821 let provider = mock_provider(vec![]);
1822 let channel = MockChannel::new(vec![]);
1823 let registry = create_test_registry();
1824 let executor = MockToolExecutor::no_tools();
1825
1826 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1827 let cid = memory.sqlite().create_conversation().await.unwrap();
1828
1829 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
1830 std::sync::Arc::new(memory),
1831 cid,
1832 50,
1833 5,
1834 100,
1835 );
1836
1837 let assistant_parts = vec![MessagePart::ToolUse {
1839 id: "id_1".to_string(),
1840 name: "list_dir".to_string(),
1841 input: serde_json::json!({"path": "/tmp"}),
1842 }];
1843 agent
1844 .persist_message(
1845 Role::Assistant,
1846 "[tool_use: list_dir(id_1)]",
1847 &assistant_parts,
1848 false,
1849 )
1850 .await;
1851
1852 let user_parts = vec![MessagePart::ToolResult {
1854 tool_use_id: "id_1".to_string(),
1855 content: "file1.txt\nfile2.txt".to_string(),
1856 is_error: false,
1857 }];
1858 agent
1859 .persist_message(
1860 Role::User,
1861 "[tool_result: id_1]\nfile1.txt\nfile2.txt",
1862 &user_parts,
1863 false,
1864 )
1865 .await;
1866
1867 let history = agent
1868 .services
1869 .memory
1870 .persistence
1871 .memory
1872 .as_ref()
1873 .unwrap()
1874 .sqlite()
1875 .load_history(cid, 50)
1876 .await
1877 .unwrap();
1878
1879 assert_eq!(history.len(), 2);
1880
1881 assert_eq!(history[0].role, Role::Assistant);
1883 assert_eq!(history[0].content, "[tool_use: list_dir(id_1)]");
1884 assert!(
1885 matches!(&history[0].parts[0], MessagePart::ToolUse { id, .. } if id == "id_1"),
1886 "first message must be assistant ToolUse"
1887 );
1888
1889 assert_eq!(history[1].role, Role::User);
1891 assert_eq!(
1892 history[1].content,
1893 "[tool_result: id_1]\nfile1.txt\nfile2.txt"
1894 );
1895 assert!(
1896 matches!(&history[1].parts[0], MessagePart::ToolResult { tool_use_id, .. } if tool_use_id == "id_1"),
1897 "second message must be user ToolResult"
1898 );
1899
1900 assert!(
1902 !history[0]
1903 .parts
1904 .iter()
1905 .any(|p| matches!(p, MessagePart::ToolResult { .. })),
1906 "assistant message must not have ToolResult parts"
1907 );
1908 assert!(
1909 !history[1]
1910 .parts
1911 .iter()
1912 .any(|p| matches!(p, MessagePart::ToolUse { .. })),
1913 "user message must not have ToolUse parts"
1914 );
1915 }
1916
1917 #[tokio::test]
1918 async fn persist_message_saves_correct_tool_output_parts() {
1919 use zeph_llm::provider::MessagePart;
1920
1921 let provider = mock_provider(vec![]);
1922 let channel = MockChannel::new(vec![]);
1923 let registry = create_test_registry();
1924 let executor = MockToolExecutor::no_tools();
1925
1926 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1927 let cid = memory.sqlite().create_conversation().await.unwrap();
1928
1929 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
1930 std::sync::Arc::new(memory),
1931 cid,
1932 50,
1933 5,
1934 100,
1935 );
1936
1937 let parts = vec![MessagePart::ToolOutput {
1938 tool_name: "shell".into(),
1939 body: "hello from shell".to_string(),
1940 compacted_at: None,
1941 }];
1942 let content = "[tool: shell]\nhello from shell";
1943
1944 agent
1945 .persist_message(Role::User, content, &parts, false)
1946 .await;
1947
1948 let history = agent
1949 .services
1950 .memory
1951 .persistence
1952 .memory
1953 .as_ref()
1954 .unwrap()
1955 .sqlite()
1956 .load_history(cid, 50)
1957 .await
1958 .unwrap();
1959
1960 assert_eq!(history.len(), 1);
1961 assert_eq!(history[0].role, Role::User);
1962 assert_eq!(history[0].content, content);
1963 assert_eq!(history[0].parts.len(), 1);
1964 match &history[0].parts[0] {
1965 MessagePart::ToolOutput {
1966 tool_name,
1967 body,
1968 compacted_at,
1969 } => {
1970 assert_eq!(tool_name, "shell");
1971 assert_eq!(body, "hello from shell");
1972 assert!(compacted_at.is_none());
1973 }
1974 other => panic!("expected ToolOutput part, got {other:?}"),
1975 }
1976 }
1977
1978 #[tokio::test]
1981 async fn load_history_removes_trailing_orphan_tool_use() {
1982 use zeph_llm::provider::MessagePart;
1983
1984 let provider = mock_provider(vec![]);
1985 let channel = MockChannel::new(vec![]);
1986 let registry = create_test_registry();
1987 let executor = MockToolExecutor::no_tools();
1988
1989 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1990 let cid = memory.sqlite().create_conversation().await.unwrap();
1991 let sqlite = memory.sqlite();
1992
1993 sqlite
1995 .save_message(cid, "user", "do something with a tool")
1996 .await
1997 .unwrap();
1998
1999 let parts = serde_json::to_string(&[MessagePart::ToolUse {
2001 id: "call_orphan".to_string(),
2002 name: "shell".to_string(),
2003 input: serde_json::json!({"command": "ls"}),
2004 }])
2005 .unwrap();
2006 sqlite
2007 .save_message_with_parts(cid, "assistant", "[tool_use: shell(call_orphan)]", &parts)
2008 .await
2009 .unwrap();
2010
2011 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2012 std::sync::Arc::new(memory),
2013 cid,
2014 50,
2015 5,
2016 100,
2017 );
2018
2019 let messages_before = agent.msg.messages.len();
2020 agent.load_history().await.unwrap();
2021
2022 assert_eq!(
2024 agent.msg.messages.len(),
2025 messages_before + 1,
2026 "orphaned trailing tool_use must be removed"
2027 );
2028 assert_eq!(agent.msg.messages.last().unwrap().role, Role::User);
2029 }
2030
2031 #[tokio::test]
2032 async fn load_history_removes_leading_orphan_tool_result() {
2033 use zeph_llm::provider::MessagePart;
2034
2035 let provider = mock_provider(vec![]);
2036 let channel = MockChannel::new(vec![]);
2037 let registry = create_test_registry();
2038 let executor = MockToolExecutor::no_tools();
2039
2040 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2041 let cid = memory.sqlite().create_conversation().await.unwrap();
2042 let sqlite = memory.sqlite();
2043
2044 let result_parts = serde_json::to_string(&[MessagePart::ToolResult {
2046 tool_use_id: "call_missing".to_string(),
2047 content: "result data".to_string(),
2048 is_error: false,
2049 }])
2050 .unwrap();
2051 sqlite
2052 .save_message_with_parts(
2053 cid,
2054 "user",
2055 "[tool_result: call_missing]\nresult data",
2056 &result_parts,
2057 )
2058 .await
2059 .unwrap();
2060
2061 sqlite
2063 .save_message(cid, "assistant", "here is my response")
2064 .await
2065 .unwrap();
2066
2067 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2068 std::sync::Arc::new(memory),
2069 cid,
2070 50,
2071 5,
2072 100,
2073 );
2074
2075 let messages_before = agent.msg.messages.len();
2076 agent.load_history().await.unwrap();
2077
2078 assert_eq!(
2080 agent.msg.messages.len(),
2081 messages_before + 1,
2082 "orphaned leading tool_result must be removed"
2083 );
2084 assert_eq!(agent.msg.messages.last().unwrap().role, Role::Assistant);
2085 }
2086
2087 #[tokio::test]
2088 async fn load_history_preserves_complete_tool_pairs() {
2089 use zeph_llm::provider::MessagePart;
2090
2091 let provider = mock_provider(vec![]);
2092 let channel = MockChannel::new(vec![]);
2093 let registry = create_test_registry();
2094 let executor = MockToolExecutor::no_tools();
2095
2096 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2097 let cid = memory.sqlite().create_conversation().await.unwrap();
2098 let sqlite = memory.sqlite();
2099
2100 let use_parts = serde_json::to_string(&[MessagePart::ToolUse {
2102 id: "call_ok".to_string(),
2103 name: "shell".to_string(),
2104 input: serde_json::json!({"command": "pwd"}),
2105 }])
2106 .unwrap();
2107 sqlite
2108 .save_message_with_parts(cid, "assistant", "[tool_use: shell(call_ok)]", &use_parts)
2109 .await
2110 .unwrap();
2111
2112 let result_parts = serde_json::to_string(&[MessagePart::ToolResult {
2113 tool_use_id: "call_ok".to_string(),
2114 content: "/home/user".to_string(),
2115 is_error: false,
2116 }])
2117 .unwrap();
2118 sqlite
2119 .save_message_with_parts(
2120 cid,
2121 "user",
2122 "[tool_result: call_ok]\n/home/user",
2123 &result_parts,
2124 )
2125 .await
2126 .unwrap();
2127
2128 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2129 std::sync::Arc::new(memory),
2130 cid,
2131 50,
2132 5,
2133 100,
2134 );
2135
2136 let messages_before = agent.msg.messages.len();
2137 agent.load_history().await.unwrap();
2138
2139 assert_eq!(
2141 agent.msg.messages.len(),
2142 messages_before + 2,
2143 "complete tool_use/tool_result pair must be preserved"
2144 );
2145 assert_eq!(agent.msg.messages[messages_before].role, Role::Assistant);
2146 assert_eq!(agent.msg.messages[messages_before + 1].role, Role::User);
2147 }
2148
2149 #[tokio::test]
2150 async fn load_history_handles_multiple_trailing_orphans() {
2151 use zeph_llm::provider::MessagePart;
2152
2153 let provider = mock_provider(vec![]);
2154 let channel = MockChannel::new(vec![]);
2155 let registry = create_test_registry();
2156 let executor = MockToolExecutor::no_tools();
2157
2158 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2159 let cid = memory.sqlite().create_conversation().await.unwrap();
2160 let sqlite = memory.sqlite();
2161
2162 sqlite.save_message(cid, "user", "start").await.unwrap();
2164
2165 let parts1 = serde_json::to_string(&[MessagePart::ToolUse {
2167 id: "call_1".to_string(),
2168 name: "shell".to_string(),
2169 input: serde_json::json!({}),
2170 }])
2171 .unwrap();
2172 sqlite
2173 .save_message_with_parts(cid, "assistant", "[tool_use: shell(call_1)]", &parts1)
2174 .await
2175 .unwrap();
2176
2177 let parts2 = serde_json::to_string(&[MessagePart::ToolUse {
2179 id: "call_2".to_string(),
2180 name: "read_file".to_string(),
2181 input: serde_json::json!({}),
2182 }])
2183 .unwrap();
2184 sqlite
2185 .save_message_with_parts(cid, "assistant", "[tool_use: read_file(call_2)]", &parts2)
2186 .await
2187 .unwrap();
2188
2189 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2190 std::sync::Arc::new(memory),
2191 cid,
2192 50,
2193 5,
2194 100,
2195 );
2196
2197 let messages_before = agent.msg.messages.len();
2198 agent.load_history().await.unwrap();
2199
2200 assert_eq!(
2202 agent.msg.messages.len(),
2203 messages_before + 1,
2204 "all trailing orphaned tool_use messages must be removed"
2205 );
2206 assert_eq!(agent.msg.messages.last().unwrap().role, Role::User);
2207 }
2208
2209 #[tokio::test]
2210 async fn load_history_no_tool_messages_unchanged() {
2211 let provider = mock_provider(vec![]);
2212 let channel = MockChannel::new(vec![]);
2213 let registry = create_test_registry();
2214 let executor = MockToolExecutor::no_tools();
2215
2216 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2217 let cid = memory.sqlite().create_conversation().await.unwrap();
2218 let sqlite = memory.sqlite();
2219
2220 sqlite.save_message(cid, "user", "hello").await.unwrap();
2221 sqlite
2222 .save_message(cid, "assistant", "hi there")
2223 .await
2224 .unwrap();
2225 sqlite
2226 .save_message(cid, "user", "how are you?")
2227 .await
2228 .unwrap();
2229
2230 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2231 std::sync::Arc::new(memory),
2232 cid,
2233 50,
2234 5,
2235 100,
2236 );
2237
2238 let messages_before = agent.msg.messages.len();
2239 agent.load_history().await.unwrap();
2240
2241 assert_eq!(
2243 agent.msg.messages.len(),
2244 messages_before + 3,
2245 "plain messages without tool parts must pass through unchanged"
2246 );
2247 }
2248
2249 #[tokio::test]
2250 async fn load_history_removes_both_leading_and_trailing_orphans() {
2251 use zeph_llm::provider::MessagePart;
2252
2253 let provider = mock_provider(vec![]);
2254 let channel = MockChannel::new(vec![]);
2255 let registry = create_test_registry();
2256 let executor = MockToolExecutor::no_tools();
2257
2258 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2259 let cid = memory.sqlite().create_conversation().await.unwrap();
2260 let sqlite = memory.sqlite();
2261
2262 let result_parts = serde_json::to_string(&[MessagePart::ToolResult {
2264 tool_use_id: "call_leading".to_string(),
2265 content: "orphaned result".to_string(),
2266 is_error: false,
2267 }])
2268 .unwrap();
2269 sqlite
2270 .save_message_with_parts(
2271 cid,
2272 "user",
2273 "[tool_result: call_leading]\norphaned result",
2274 &result_parts,
2275 )
2276 .await
2277 .unwrap();
2278
2279 sqlite
2281 .save_message(cid, "user", "what is 2+2?")
2282 .await
2283 .unwrap();
2284 sqlite.save_message(cid, "assistant", "4").await.unwrap();
2285
2286 let use_parts = serde_json::to_string(&[MessagePart::ToolUse {
2288 id: "call_trailing".to_string(),
2289 name: "shell".to_string(),
2290 input: serde_json::json!({"command": "date"}),
2291 }])
2292 .unwrap();
2293 sqlite
2294 .save_message_with_parts(
2295 cid,
2296 "assistant",
2297 "[tool_use: shell(call_trailing)]",
2298 &use_parts,
2299 )
2300 .await
2301 .unwrap();
2302
2303 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2304 std::sync::Arc::new(memory),
2305 cid,
2306 50,
2307 5,
2308 100,
2309 );
2310
2311 let messages_before = agent.msg.messages.len();
2312 agent.load_history().await.unwrap();
2313
2314 assert_eq!(
2316 agent.msg.messages.len(),
2317 messages_before + 2,
2318 "both leading and trailing orphans must be removed"
2319 );
2320 assert_eq!(agent.msg.messages[messages_before].role, Role::User);
2321 assert_eq!(agent.msg.messages[messages_before].content, "what is 2+2?");
2322 assert_eq!(
2323 agent.msg.messages[messages_before + 1].role,
2324 Role::Assistant
2325 );
2326 assert_eq!(agent.msg.messages[messages_before + 1].content, "4");
2327 }
2328
2329 #[tokio::test]
2334 async fn sanitize_tool_pairs_strips_mid_history_orphan_tool_use() {
2335 use zeph_llm::provider::MessagePart;
2336
2337 let provider = mock_provider(vec![]);
2338 let channel = MockChannel::new(vec![]);
2339 let registry = create_test_registry();
2340 let executor = MockToolExecutor::no_tools();
2341
2342 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2343 let cid = memory.sqlite().create_conversation().await.unwrap();
2344 let sqlite = memory.sqlite();
2345
2346 sqlite
2348 .save_message(cid, "user", "first question")
2349 .await
2350 .unwrap();
2351 sqlite
2352 .save_message(cid, "assistant", "first answer")
2353 .await
2354 .unwrap();
2355
2356 let use_parts = serde_json::to_string(&[
2360 MessagePart::ToolUse {
2361 id: "call_mid_1".to_string(),
2362 name: "shell".to_string(),
2363 input: serde_json::json!({"command": "ls"}),
2364 },
2365 MessagePart::Text {
2366 text: "Let me check the files.".to_string(),
2367 },
2368 ])
2369 .unwrap();
2370 sqlite
2371 .save_message_with_parts(cid, "assistant", "Let me check the files.", &use_parts)
2372 .await
2373 .unwrap();
2374
2375 sqlite
2377 .save_message(cid, "user", "second question")
2378 .await
2379 .unwrap();
2380 sqlite
2381 .save_message(cid, "assistant", "second answer")
2382 .await
2383 .unwrap();
2384
2385 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2386 std::sync::Arc::new(memory),
2387 cid,
2388 50,
2389 5,
2390 100,
2391 );
2392
2393 let messages_before = agent.msg.messages.len();
2394 agent.load_history().await.unwrap();
2395
2396 assert_eq!(
2399 agent.msg.messages.len(),
2400 messages_before + 5,
2401 "message count must be 5 (orphan message kept — has text content)"
2402 );
2403
2404 let orphan = &agent.msg.messages[messages_before + 2];
2406 assert_eq!(orphan.role, Role::Assistant);
2407 assert!(
2408 !orphan
2409 .parts
2410 .iter()
2411 .any(|p| matches!(p, MessagePart::ToolUse { .. })),
2412 "orphaned ToolUse parts must be stripped from mid-history message"
2413 );
2414 assert!(
2416 orphan.parts.iter().any(
2417 |p| matches!(p, MessagePart::Text { text } if text == "Let me check the files.")
2418 ),
2419 "text content of orphaned assistant message must be preserved"
2420 );
2421 }
2422
2423 #[tokio::test]
2428 async fn load_history_keeps_tool_only_user_message() {
2429 use zeph_llm::provider::MessagePart;
2430
2431 let provider = mock_provider(vec![]);
2432 let channel = MockChannel::new(vec![]);
2433 let registry = create_test_registry();
2434 let executor = MockToolExecutor::no_tools();
2435
2436 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2437 let cid = memory.sqlite().create_conversation().await.unwrap();
2438 let sqlite = memory.sqlite();
2439
2440 let use_parts = serde_json::to_string(&[MessagePart::ToolUse {
2442 id: "call_rc3".to_string(),
2443 name: "memory_save".to_string(),
2444 input: serde_json::json!({"content": "something"}),
2445 }])
2446 .unwrap();
2447 sqlite
2448 .save_message_with_parts(cid, "assistant", "[tool_use: memory_save]", &use_parts)
2449 .await
2450 .unwrap();
2451
2452 let result_parts = serde_json::to_string(&[MessagePart::ToolResult {
2454 tool_use_id: "call_rc3".to_string(),
2455 content: "saved".to_string(),
2456 is_error: false,
2457 }])
2458 .unwrap();
2459 sqlite
2460 .save_message_with_parts(cid, "user", "", &result_parts)
2461 .await
2462 .unwrap();
2463
2464 sqlite.save_message(cid, "assistant", "done").await.unwrap();
2465
2466 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2467 std::sync::Arc::new(memory),
2468 cid,
2469 50,
2470 5,
2471 100,
2472 );
2473
2474 let messages_before = agent.msg.messages.len();
2475 agent.load_history().await.unwrap();
2476
2477 assert_eq!(
2480 agent.msg.messages.len(),
2481 messages_before + 3,
2482 "user message with empty content but ToolResult parts must not be dropped"
2483 );
2484
2485 let user_msg = &agent.msg.messages[messages_before + 1];
2487 assert_eq!(user_msg.role, Role::User);
2488 assert!(
2489 user_msg.parts.iter().any(
2490 |p| matches!(p, MessagePart::ToolResult { tool_use_id, .. } if tool_use_id == "call_rc3")
2491 ),
2492 "ToolResult part must be preserved on user message with empty content"
2493 );
2494 }
2495
2496 #[tokio::test]
2500 async fn strip_orphans_removes_orphaned_tool_result() {
2501 use zeph_llm::provider::MessagePart;
2502
2503 let provider = mock_provider(vec![]);
2504 let channel = MockChannel::new(vec![]);
2505 let registry = create_test_registry();
2506 let executor = MockToolExecutor::no_tools();
2507
2508 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2509 let cid = memory.sqlite().create_conversation().await.unwrap();
2510 let sqlite = memory.sqlite();
2511
2512 sqlite.save_message(cid, "user", "hello").await.unwrap();
2514 sqlite.save_message(cid, "assistant", "hi").await.unwrap();
2515
2516 sqlite
2518 .save_message(cid, "assistant", "plain answer")
2519 .await
2520 .unwrap();
2521
2522 let orphan_result_parts = serde_json::to_string(&[MessagePart::ToolResult {
2524 tool_use_id: "call_nonexistent".to_string(),
2525 content: "stale result".to_string(),
2526 is_error: false,
2527 }])
2528 .unwrap();
2529 sqlite
2530 .save_message_with_parts(
2531 cid,
2532 "user",
2533 "[tool_result: call_nonexistent]\nstale result",
2534 &orphan_result_parts,
2535 )
2536 .await
2537 .unwrap();
2538
2539 sqlite
2540 .save_message(cid, "assistant", "final")
2541 .await
2542 .unwrap();
2543
2544 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2545 std::sync::Arc::new(memory),
2546 cid,
2547 50,
2548 5,
2549 100,
2550 );
2551
2552 let messages_before = agent.msg.messages.len();
2553 agent.load_history().await.unwrap();
2554
2555 let loaded = &agent.msg.messages[messages_before..];
2559 for msg in loaded {
2560 assert!(
2561 !msg.parts.iter().any(|p| matches!(
2562 p,
2563 MessagePart::ToolResult { tool_use_id, .. } if tool_use_id == "call_nonexistent"
2564 )),
2565 "orphaned ToolResult part must be stripped from history"
2566 );
2567 }
2568 }
2569
2570 #[tokio::test]
2573 async fn strip_orphans_keeps_complete_pair() {
2574 use zeph_llm::provider::MessagePart;
2575
2576 let provider = mock_provider(vec![]);
2577 let channel = MockChannel::new(vec![]);
2578 let registry = create_test_registry();
2579 let executor = MockToolExecutor::no_tools();
2580
2581 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2582 let cid = memory.sqlite().create_conversation().await.unwrap();
2583 let sqlite = memory.sqlite();
2584
2585 let use_parts = serde_json::to_string(&[MessagePart::ToolUse {
2586 id: "call_valid".to_string(),
2587 name: "shell".to_string(),
2588 input: serde_json::json!({"command": "ls"}),
2589 }])
2590 .unwrap();
2591 sqlite
2592 .save_message_with_parts(cid, "assistant", "[tool_use: shell]", &use_parts)
2593 .await
2594 .unwrap();
2595
2596 let result_parts = serde_json::to_string(&[MessagePart::ToolResult {
2597 tool_use_id: "call_valid".to_string(),
2598 content: "file.rs".to_string(),
2599 is_error: false,
2600 }])
2601 .unwrap();
2602 sqlite
2603 .save_message_with_parts(cid, "user", "", &result_parts)
2604 .await
2605 .unwrap();
2606
2607 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2608 std::sync::Arc::new(memory),
2609 cid,
2610 50,
2611 5,
2612 100,
2613 );
2614
2615 let messages_before = agent.msg.messages.len();
2616 agent.load_history().await.unwrap();
2617
2618 assert_eq!(
2619 agent.msg.messages.len(),
2620 messages_before + 2,
2621 "complete tool_use/tool_result pair must be preserved"
2622 );
2623
2624 let user_msg = &agent.msg.messages[messages_before + 1];
2625 assert!(
2626 user_msg.parts.iter().any(|p| matches!(
2627 p,
2628 MessagePart::ToolResult { tool_use_id, .. } if tool_use_id == "call_valid"
2629 )),
2630 "ToolResult part for a matched tool_use must not be stripped"
2631 );
2632 }
2633
2634 #[tokio::test]
2637 async fn strip_orphans_mixed_history() {
2638 use zeph_llm::provider::MessagePart;
2639
2640 let provider = mock_provider(vec![]);
2641 let channel = MockChannel::new(vec![]);
2642 let registry = create_test_registry();
2643 let executor = MockToolExecutor::no_tools();
2644
2645 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2646 let cid = memory.sqlite().create_conversation().await.unwrap();
2647 let sqlite = memory.sqlite();
2648
2649 let use_parts_ok = serde_json::to_string(&[MessagePart::ToolUse {
2651 id: "call_good".to_string(),
2652 name: "shell".to_string(),
2653 input: serde_json::json!({"command": "pwd"}),
2654 }])
2655 .unwrap();
2656 sqlite
2657 .save_message_with_parts(cid, "assistant", "[tool_use: shell]", &use_parts_ok)
2658 .await
2659 .unwrap();
2660
2661 let result_parts_ok = serde_json::to_string(&[MessagePart::ToolResult {
2662 tool_use_id: "call_good".to_string(),
2663 content: "/home".to_string(),
2664 is_error: false,
2665 }])
2666 .unwrap();
2667 sqlite
2668 .save_message_with_parts(cid, "user", "", &result_parts_ok)
2669 .await
2670 .unwrap();
2671
2672 sqlite
2674 .save_message(cid, "assistant", "text only")
2675 .await
2676 .unwrap();
2677
2678 let orphan_parts = serde_json::to_string(&[MessagePart::ToolResult {
2679 tool_use_id: "call_ghost".to_string(),
2680 content: "ghost result".to_string(),
2681 is_error: false,
2682 }])
2683 .unwrap();
2684 sqlite
2685 .save_message_with_parts(
2686 cid,
2687 "user",
2688 "[tool_result: call_ghost]\nghost result",
2689 &orphan_parts,
2690 )
2691 .await
2692 .unwrap();
2693
2694 sqlite
2695 .save_message(cid, "assistant", "final reply")
2696 .await
2697 .unwrap();
2698
2699 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2700 std::sync::Arc::new(memory),
2701 cid,
2702 50,
2703 5,
2704 100,
2705 );
2706
2707 let messages_before = agent.msg.messages.len();
2708 agent.load_history().await.unwrap();
2709
2710 let loaded = &agent.msg.messages[messages_before..];
2711
2712 for msg in loaded {
2714 assert!(
2715 !msg.parts.iter().any(|p| matches!(
2716 p,
2717 MessagePart::ToolResult { tool_use_id, .. } if tool_use_id == "call_ghost"
2718 )),
2719 "orphaned ToolResult (call_ghost) must be stripped from history"
2720 );
2721 }
2722
2723 let has_good_result = loaded.iter().any(|msg| {
2726 msg.role == Role::User
2727 && msg.parts.iter().any(|p| {
2728 matches!(
2729 p,
2730 MessagePart::ToolResult { tool_use_id, .. } if tool_use_id == "call_good"
2731 )
2732 })
2733 });
2734 assert!(
2735 has_good_result,
2736 "matched ToolResult (call_good) must be preserved in history"
2737 );
2738 }
2739
2740 #[tokio::test]
2743 async fn sanitize_tool_pairs_preserves_matched_tool_pair() {
2744 use zeph_llm::provider::MessagePart;
2745
2746 let provider = mock_provider(vec![]);
2747 let channel = MockChannel::new(vec![]);
2748 let registry = create_test_registry();
2749 let executor = MockToolExecutor::no_tools();
2750
2751 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2752 let cid = memory.sqlite().create_conversation().await.unwrap();
2753 let sqlite = memory.sqlite();
2754
2755 sqlite
2756 .save_message(cid, "user", "run a command")
2757 .await
2758 .unwrap();
2759
2760 let use_parts = serde_json::to_string(&[MessagePart::ToolUse {
2762 id: "call_ok".to_string(),
2763 name: "shell".to_string(),
2764 input: serde_json::json!({"command": "echo hi"}),
2765 }])
2766 .unwrap();
2767 sqlite
2768 .save_message_with_parts(cid, "assistant", "[tool_use: shell]", &use_parts)
2769 .await
2770 .unwrap();
2771
2772 let result_parts = serde_json::to_string(&[MessagePart::ToolResult {
2774 tool_use_id: "call_ok".to_string(),
2775 content: "hi".to_string(),
2776 is_error: false,
2777 }])
2778 .unwrap();
2779 sqlite
2780 .save_message_with_parts(cid, "user", "[tool_result: call_ok]\nhi", &result_parts)
2781 .await
2782 .unwrap();
2783
2784 sqlite.save_message(cid, "assistant", "done").await.unwrap();
2785
2786 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2787 std::sync::Arc::new(memory),
2788 cid,
2789 50,
2790 5,
2791 100,
2792 );
2793
2794 let messages_before = agent.msg.messages.len();
2795 agent.load_history().await.unwrap();
2796
2797 assert_eq!(
2799 agent.msg.messages.len(),
2800 messages_before + 4,
2801 "matched tool pair must not be removed"
2802 );
2803 let tool_msg = &agent.msg.messages[messages_before + 1];
2804 assert!(
2805 tool_msg
2806 .parts
2807 .iter()
2808 .any(|p| matches!(p, MessagePart::ToolUse { id, .. } if id == "call_ok")),
2809 "matched ToolUse parts must be preserved"
2810 );
2811 }
2812
2813 #[tokio::test]
2817 async fn persist_cancelled_tool_results_pairs_tool_use() {
2818 use zeph_llm::provider::MessagePart;
2819
2820 let provider = mock_provider(vec![]);
2821 let channel = MockChannel::new(vec![]);
2822 let registry = create_test_registry();
2823 let executor = MockToolExecutor::no_tools();
2824
2825 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2826 let cid = memory.sqlite().create_conversation().await.unwrap();
2827
2828 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2829 std::sync::Arc::new(memory),
2830 cid,
2831 50,
2832 5,
2833 100,
2834 );
2835
2836 let tool_calls = vec![
2838 zeph_llm::provider::ToolUseRequest {
2839 id: "cancel_id_1".to_string(),
2840 name: "shell".to_string().into(),
2841 input: serde_json::json!({}),
2842 },
2843 zeph_llm::provider::ToolUseRequest {
2844 id: "cancel_id_2".to_string(),
2845 name: "read_file".to_string().into(),
2846 input: serde_json::json!({}),
2847 },
2848 ];
2849
2850 agent.persist_cancelled_tool_results(&tool_calls).await;
2851
2852 let history = agent
2853 .services
2854 .memory
2855 .persistence
2856 .memory
2857 .as_ref()
2858 .unwrap()
2859 .sqlite()
2860 .load_history(cid, 50)
2861 .await
2862 .unwrap();
2863
2864 assert_eq!(history.len(), 1);
2866 assert_eq!(history[0].role, Role::User);
2867
2868 for tc in &tool_calls {
2870 assert!(
2871 history[0].parts.iter().any(|p| matches!(
2872 p,
2873 MessagePart::ToolResult { tool_use_id, is_error, .. }
2874 if tool_use_id == &tc.id && *is_error
2875 )),
2876 "tombstone ToolResult for {} must be present and is_error=true",
2877 tc.id
2878 );
2879 }
2880 }
2881
2882 #[tokio::test]
2893 async fn issue_2529_orphaned_legacy_content_pair_is_soft_deleted() {
2894 use zeph_llm::provider::MessagePart;
2895
2896 let provider = mock_provider(vec![]);
2897 let channel = MockChannel::new(vec![]);
2898 let registry = create_test_registry();
2899 let executor = MockToolExecutor::no_tools();
2900
2901 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2902 let cid = memory.sqlite().create_conversation().await.unwrap();
2903 let sqlite = memory.sqlite();
2904
2905 sqlite
2907 .save_message(cid, "user", "save this for me")
2908 .await
2909 .unwrap();
2910
2911 let use_parts = serde_json::to_string(&[MessagePart::ToolUse {
2914 id: "call_2529".to_string(),
2915 name: "memory_save".to_string(),
2916 input: serde_json::json!({"content": "save this"}),
2917 }])
2918 .unwrap();
2919 let orphan_assistant_id = sqlite
2920 .save_message_with_parts(
2921 cid,
2922 "assistant",
2923 "[tool_use: memory_save(call_2529)]",
2924 &use_parts,
2925 )
2926 .await
2927 .unwrap();
2928
2929 sqlite
2934 .save_message(cid, "assistant", "here is a plain reply")
2935 .await
2936 .unwrap();
2937
2938 let result_parts = serde_json::to_string(&[MessagePart::ToolResult {
2939 tool_use_id: "call_2529".to_string(),
2940 content: "saved".to_string(),
2941 is_error: false,
2942 }])
2943 .unwrap();
2944 let orphan_user_id = sqlite
2945 .save_message_with_parts(
2946 cid,
2947 "user",
2948 "[tool_result: call_2529]\nsaved",
2949 &result_parts,
2950 )
2951 .await
2952 .unwrap();
2953
2954 sqlite.save_message(cid, "assistant", "done").await.unwrap();
2956
2957 let memory_arc = std::sync::Arc::new(memory);
2958 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2959 memory_arc.clone(),
2960 cid,
2961 50,
2962 5,
2963 100,
2964 );
2965
2966 agent.load_history().await.unwrap();
2967
2968 let assistant_deleted_count: Vec<i64> = zeph_db::query_scalar(
2971 "SELECT COUNT(*) FROM messages WHERE id = ? AND deleted_at IS NOT NULL",
2972 )
2973 .bind(orphan_assistant_id)
2974 .fetch_all(memory_arc.sqlite().pool())
2975 .await
2976 .unwrap();
2977
2978 let user_deleted_count: Vec<i64> = zeph_db::query_scalar(
2979 "SELECT COUNT(*) FROM messages WHERE id = ? AND deleted_at IS NOT NULL",
2980 )
2981 .bind(orphan_user_id)
2982 .fetch_all(memory_arc.sqlite().pool())
2983 .await
2984 .unwrap();
2985
2986 assert_eq!(
2987 assistant_deleted_count.first().copied().unwrap_or(0),
2988 1,
2989 "orphaned assistant[ToolUse] with legacy-only content must be soft-deleted (deleted_at IS NOT NULL)"
2990 );
2991 assert_eq!(
2992 user_deleted_count.first().copied().unwrap_or(0),
2993 1,
2994 "orphaned user[ToolResult] with legacy-only content must be soft-deleted (deleted_at IS NOT NULL)"
2995 );
2996 }
2997
2998 #[tokio::test]
3002 async fn issue_2529_soft_delete_is_idempotent_across_sessions() {
3003 use zeph_llm::provider::MessagePart;
3004
3005 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
3006 let cid = memory.sqlite().create_conversation().await.unwrap();
3007 let sqlite = memory.sqlite();
3008
3009 sqlite
3011 .save_message(cid, "user", "do something")
3012 .await
3013 .unwrap();
3014
3015 let use_parts = serde_json::to_string(&[MessagePart::ToolUse {
3017 id: "call_idem".to_string(),
3018 name: "shell".to_string(),
3019 input: serde_json::json!({"command": "ls"}),
3020 }])
3021 .unwrap();
3022 sqlite
3023 .save_message_with_parts(cid, "assistant", "[tool_use: shell(call_idem)]", &use_parts)
3024 .await
3025 .unwrap();
3026
3027 sqlite
3029 .save_message(cid, "assistant", "continuing")
3030 .await
3031 .unwrap();
3032
3033 let result_parts = serde_json::to_string(&[MessagePart::ToolResult {
3035 tool_use_id: "call_idem".to_string(),
3036 content: "output".to_string(),
3037 is_error: false,
3038 }])
3039 .unwrap();
3040 sqlite
3041 .save_message_with_parts(
3042 cid,
3043 "user",
3044 "[tool_result: call_idem]\noutput",
3045 &result_parts,
3046 )
3047 .await
3048 .unwrap();
3049
3050 sqlite
3051 .save_message(cid, "assistant", "final")
3052 .await
3053 .unwrap();
3054
3055 let memory_arc = std::sync::Arc::new(memory);
3056
3057 let mut agent1 = Agent::new(
3059 mock_provider(vec![]),
3060 MockChannel::new(vec![]),
3061 create_test_registry(),
3062 None,
3063 5,
3064 MockToolExecutor::no_tools(),
3065 )
3066 .with_memory(memory_arc.clone(), cid, 50, 5, 100);
3067 agent1.load_history().await.unwrap();
3068 let count_after_first = agent1.msg.messages.len();
3069
3070 let mut agent2 = Agent::new(
3073 mock_provider(vec![]),
3074 MockChannel::new(vec![]),
3075 create_test_registry(),
3076 None,
3077 5,
3078 MockToolExecutor::no_tools(),
3079 )
3080 .with_memory(memory_arc.clone(), cid, 50, 5, 100);
3081 agent2.load_history().await.unwrap();
3082 let count_after_second = agent2.msg.messages.len();
3083
3084 assert_eq!(
3086 count_after_first, count_after_second,
3087 "second load_history must load the same message count as the first (soft-deleted orphans excluded)"
3088 );
3089 }
3090
3091 #[tokio::test]
3095 async fn issue_2529_message_with_text_and_tool_tag_is_kept_after_part_strip() {
3096 use zeph_llm::provider::MessagePart;
3097
3098 let provider = mock_provider(vec![]);
3099 let channel = MockChannel::new(vec![]);
3100 let registry = create_test_registry();
3101 let executor = MockToolExecutor::no_tools();
3102
3103 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
3104 let cid = memory.sqlite().create_conversation().await.unwrap();
3105 let sqlite = memory.sqlite();
3106
3107 sqlite
3109 .save_message(cid, "user", "check the files")
3110 .await
3111 .unwrap();
3112
3113 let use_parts = serde_json::to_string(&[MessagePart::ToolUse {
3116 id: "call_mixed".to_string(),
3117 name: "shell".to_string(),
3118 input: serde_json::json!({"command": "ls"}),
3119 }])
3120 .unwrap();
3121 sqlite
3122 .save_message_with_parts(
3123 cid,
3124 "assistant",
3125 "Let me list the directory. [tool_use: shell(call_mixed)]",
3126 &use_parts,
3127 )
3128 .await
3129 .unwrap();
3130
3131 sqlite.save_message(cid, "user", "thanks").await.unwrap();
3133 sqlite
3134 .save_message(cid, "assistant", "you are welcome")
3135 .await
3136 .unwrap();
3137
3138 let memory_arc = std::sync::Arc::new(memory);
3139 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
3140 memory_arc.clone(),
3141 cid,
3142 50,
3143 5,
3144 100,
3145 );
3146
3147 let messages_before = agent.msg.messages.len();
3148 agent.load_history().await.unwrap();
3149
3150 assert_eq!(
3152 agent.msg.messages.len(),
3153 messages_before + 4,
3154 "assistant message with text + tool tag must not be removed after ToolUse strip"
3155 );
3156
3157 let mixed_msg = agent
3159 .msg
3160 .messages
3161 .iter()
3162 .find(|m| m.content.contains("Let me list the directory"))
3163 .expect("mixed-content assistant message must still be in history");
3164 assert!(
3165 !mixed_msg
3166 .parts
3167 .iter()
3168 .any(|p| matches!(p, MessagePart::ToolUse { .. })),
3169 "orphaned ToolUse parts must be stripped even when message has meaningful text"
3170 );
3171 assert_eq!(
3172 mixed_msg.content, "Let me list the directory. [tool_use: shell(call_mixed)]",
3173 "content field must be unchanged — only parts are stripped"
3174 );
3175 }
3176
3177 #[tokio::test]
3180 async fn persist_message_skipped_tool_result_does_not_embed() {
3181 use zeph_llm::provider::MessagePart;
3182
3183 let provider = mock_provider(vec![]);
3184 let channel = MockChannel::new(vec![]);
3185 let registry = create_test_registry();
3186 let executor = MockToolExecutor::no_tools();
3187
3188 let (tx, rx) = tokio::sync::watch::channel(MetricsSnapshot::default());
3189 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
3190 let cid = memory.sqlite().create_conversation().await.unwrap();
3191
3192 let mut agent = Agent::new(provider, channel, registry, None, 5, executor)
3193 .with_metrics(tx)
3194 .with_memory(std::sync::Arc::new(memory), cid, 50, 5, 100);
3195 agent.services.memory.persistence.autosave_assistant = true;
3196 agent.services.memory.persistence.autosave_min_length = 0;
3197
3198 let parts = vec![MessagePart::ToolResult {
3199 tool_use_id: "tu1".into(),
3200 content: "[skipped] bash tool was blocked by utility gate".into(),
3201 is_error: false,
3202 }];
3203
3204 agent
3205 .persist_message(
3206 Role::User,
3207 "[skipped] bash tool was blocked by utility gate",
3208 &parts,
3209 false,
3210 )
3211 .await;
3212
3213 assert_eq!(
3214 rx.borrow().embeddings_generated,
3215 0,
3216 "[skipped] ToolResult must not be embedded into Qdrant"
3217 );
3218 }
3219
3220 #[tokio::test]
3221 async fn persist_message_stopped_tool_result_does_not_embed() {
3222 use zeph_llm::provider::MessagePart;
3223
3224 let provider = mock_provider(vec![]);
3225 let channel = MockChannel::new(vec![]);
3226 let registry = create_test_registry();
3227 let executor = MockToolExecutor::no_tools();
3228
3229 let (tx, rx) = tokio::sync::watch::channel(MetricsSnapshot::default());
3230 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
3231 let cid = memory.sqlite().create_conversation().await.unwrap();
3232
3233 let mut agent = Agent::new(provider, channel, registry, None, 5, executor)
3234 .with_metrics(tx)
3235 .with_memory(std::sync::Arc::new(memory), cid, 50, 5, 100);
3236 agent.services.memory.persistence.autosave_assistant = true;
3237 agent.services.memory.persistence.autosave_min_length = 0;
3238
3239 let parts = vec![MessagePart::ToolResult {
3240 tool_use_id: "tu2".into(),
3241 content: "[stopped] execution limit reached".into(),
3242 is_error: false,
3243 }];
3244
3245 agent
3246 .persist_message(
3247 Role::User,
3248 "[stopped] execution limit reached",
3249 &parts,
3250 false,
3251 )
3252 .await;
3253
3254 assert_eq!(
3255 rx.borrow().embeddings_generated,
3256 0,
3257 "[stopped] ToolResult must not be embedded into Qdrant"
3258 );
3259 }
3260
3261 #[tokio::test]
3262 async fn persist_message_normal_tool_result_is_saved_not_blocked_by_guard() {
3263 use zeph_llm::provider::MessagePart;
3266
3267 let provider = mock_provider(vec![]);
3268 let channel = MockChannel::new(vec![]);
3269 let registry = create_test_registry();
3270 let executor = MockToolExecutor::no_tools();
3271
3272 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
3273 let cid = memory.sqlite().create_conversation().await.unwrap();
3274 let memory_arc = std::sync::Arc::new(memory);
3275
3276 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
3277 memory_arc.clone(),
3278 cid,
3279 50,
3280 5,
3281 100,
3282 );
3283 agent.services.memory.persistence.autosave_assistant = true;
3284 agent.services.memory.persistence.autosave_min_length = 0;
3285
3286 let content = "total 42\ndrwxr-xr-x 5 user group";
3287 let parts = vec![MessagePart::ToolResult {
3288 tool_use_id: "tu3".into(),
3289 content: content.into(),
3290 is_error: false,
3291 }];
3292
3293 agent
3294 .persist_message(Role::User, content, &parts, false)
3295 .await;
3296
3297 let history = memory_arc.sqlite().load_history(cid, 50).await.unwrap();
3299 assert_eq!(
3300 history.len(),
3301 1,
3302 "normal ToolResult must be saved to SQLite"
3303 );
3304 assert_eq!(history[0].content, content);
3305 }
3306
3307 #[test]
3312 fn trajectory_extraction_slice_bounds_messages() {
3313 let max_messages: usize = 20;
3315 let total_messages = 100usize;
3316
3317 let tail_start = total_messages.saturating_sub(max_messages);
3318 let window = total_messages - tail_start;
3319
3320 assert_eq!(
3321 window, 20,
3322 "slice should contain exactly max_messages items"
3323 );
3324 assert_eq!(tail_start, 80, "slice should start at len - max_messages");
3325 }
3326
3327 #[test]
3328 fn trajectory_extraction_slice_handles_few_messages() {
3329 let max_messages: usize = 20;
3330 let total_messages = 5usize;
3331
3332 let tail_start = total_messages.saturating_sub(max_messages);
3333 let window = total_messages - tail_start;
3334
3335 assert_eq!(window, 5, "should return all messages when fewer than max");
3336 assert_eq!(tail_start, 0, "slice should start from the beginning");
3337 }
3338
3339 #[tokio::test]
3345 async fn regression_3168_complete_tool_pair_survives_round_trip() {
3346 use zeph_llm::provider::MessagePart;
3347
3348 let provider = mock_provider(vec![]);
3349 let channel = MockChannel::new(vec![]);
3350 let registry = create_test_registry();
3351 let executor = MockToolExecutor::no_tools();
3352
3353 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
3354 let cid = memory.sqlite().create_conversation().await.unwrap();
3355 let sqlite = memory.sqlite();
3356
3357 let use_parts = serde_json::to_string(&[MessagePart::ToolUse {
3358 id: "r3168_call".to_string(),
3359 name: "shell".to_string(),
3360 input: serde_json::json!({"command": "echo hi"}),
3361 }])
3362 .unwrap();
3363 sqlite
3364 .save_message_with_parts(
3365 cid,
3366 "assistant",
3367 "[tool_use: shell(r3168_call)]",
3368 &use_parts,
3369 )
3370 .await
3371 .unwrap();
3372
3373 let result_parts = serde_json::to_string(&[MessagePart::ToolResult {
3374 tool_use_id: "r3168_call".to_string(),
3375 content: "[skipped]".to_string(),
3376 is_error: false,
3377 }])
3378 .unwrap();
3379 sqlite
3380 .save_message_with_parts(cid, "user", "[tool_result: r3168_call]", &result_parts)
3381 .await
3382 .unwrap();
3383
3384 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
3385 std::sync::Arc::new(memory),
3386 cid,
3387 50,
3388 5,
3389 100,
3390 );
3391
3392 let base = agent.msg.messages.len();
3393 agent.load_history().await.unwrap();
3394
3395 assert_eq!(
3396 agent.msg.messages.len(),
3397 base + 2,
3398 "both messages of the complete pair must survive load_history"
3399 );
3400
3401 let assistant_msg = agent
3402 .msg
3403 .messages
3404 .iter()
3405 .find(|m| m.role == Role::Assistant)
3406 .expect("assistant message missing after load_history");
3407 assert!(
3408 assistant_msg
3409 .parts
3410 .iter()
3411 .any(|p| matches!(p, MessagePart::ToolUse { id, .. } if id == "r3168_call")),
3412 "ToolUse part must be preserved in assistant message"
3413 );
3414
3415 let user_msg = agent
3416 .msg
3417 .messages
3418 .iter()
3419 .rev()
3420 .find(|m| m.role == Role::User)
3421 .expect("user message missing after load_history");
3422 assert!(
3423 user_msg.parts.iter().any(|p| matches!(p, MessagePart::ToolResult { tool_use_id, .. } if tool_use_id == "r3168_call")),
3424 "ToolResult part must be preserved in user message"
3425 );
3426 }
3427
3428 #[tokio::test]
3433 async fn regression_3168_corrupt_parts_row_skipped_on_load() {
3434 use zeph_llm::provider::MessagePart;
3435
3436 let provider = mock_provider(vec![]);
3437 let channel = MockChannel::new(vec![]);
3438 let registry = create_test_registry();
3439 let executor = MockToolExecutor::no_tools();
3440
3441 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
3442 let cid = memory.sqlite().create_conversation().await.unwrap();
3443 let sqlite = memory.sqlite();
3444
3445 sqlite
3449 .save_message_with_parts(cid, "assistant", "[tool_use: shell(corrupt)]", "[]")
3450 .await
3451 .unwrap();
3452
3453 let result_parts = serde_json::to_string(&[MessagePart::ToolResult {
3455 tool_use_id: "corrupt".to_string(),
3456 content: "result".to_string(),
3457 is_error: false,
3458 }])
3459 .unwrap();
3460 sqlite
3461 .save_message_with_parts(cid, "user", "[tool_result: corrupt]", &result_parts)
3462 .await
3463 .unwrap();
3464
3465 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
3466 std::sync::Arc::new(memory),
3467 cid,
3468 50,
3469 5,
3470 100,
3471 );
3472
3473 let base = agent.msg.messages.len();
3474 agent.load_history().await.unwrap();
3475
3476 let loaded = agent.msg.messages.len() - base;
3481 let orphan_present = agent.msg.messages.iter().any(|m| {
3484 m.role == Role::User
3485 && m.parts.iter().any(|p| {
3486 matches!(p, MessagePart::ToolResult { tool_use_id, .. } if tool_use_id == "corrupt")
3487 })
3488 });
3489 assert!(
3490 !orphan_present,
3491 "orphaned ToolResult must not survive load_history; loaded={loaded}"
3492 );
3493 }
3494}