1use crate::channel::Channel;
5use zeph_agent_persistence::graph::{build_graph_extraction_config, collect_context_messages};
6use zeph_agent_persistence::{
7 LoadHistoryParams, MemoryPersistenceView, MetricsView, PersistMessageRequest,
8 PersistenceService, SecurityView,
9};
10use zeph_llm::provider::{LlmProvider as _, MessagePart, Role};
11
12use super::Agent;
13
14impl<C: Channel> Agent<C> {
15 #[tracing::instrument(name = "core.persist.load_history", skip_all, level = "debug", err)]
31 pub async fn load_history(&mut self) -> Result<(), super::error::AgentError> {
32 let (Some(memory), Some(cid)) = (
33 self.services.memory.persistence.memory.as_ref(),
34 self.services.memory.persistence.conversation_id,
35 ) else {
36 return Ok(());
37 };
38
39 let memory = memory.clone();
41
42 let mut unsummarized = self.services.memory.persistence.unsummarized_count;
43 let memory_view = MemoryPersistenceView {
46 memory: Some(&memory),
47 conversation_id: self.services.memory.persistence.conversation_id,
48 autosave_assistant: self.services.memory.persistence.autosave_assistant,
49 autosave_min_length: self.services.memory.persistence.autosave_min_length,
50 unsummarized_count: &mut unsummarized,
51 goal_text: self.services.memory.extraction.goal_text.clone(),
52 };
53
54 let svc = PersistenceService::new();
55 let outcome = svc
56 .load_history(LoadHistoryParams {
57 messages: &mut self.msg.messages,
58 last_persisted_message_id: &mut self.msg.last_persisted_message_id,
59 deferred_hide_ids: &mut self.msg.deferred_db_hide_ids,
60 memory_view: &memory_view,
61 })
62 .await
63 .map_err(|e| {
64 super::error::AgentError::Memory(zeph_memory::MemoryError::Other(e.to_string()))
65 })?;
66
67 self.services.memory.persistence.unsummarized_count = unsummarized;
69
70 if outcome.messages_loaded > 0 {
71 let _ = memory
73 .sqlite()
74 .increment_session_counts_for_conversation(cid)
75 .await
76 .inspect_err(|e| {
77 tracing::warn!(error = %e, "failed to increment tier session counts");
78 });
79 }
80
81 self.update_metrics(|m| {
83 m.sqlite_message_count = outcome.sqlite_total_messages;
84 });
85 if let Ok(count) = memory.sqlite().count_semantic_facts().await {
86 let count_u64 = u64::try_from(count).unwrap_or(0);
87 self.update_metrics(|m| {
88 m.semantic_fact_count = count_u64;
89 });
90 }
91 if let Ok(count) = memory.unsummarized_message_count(cid).await {
92 self.services.memory.persistence.unsummarized_count =
93 usize::try_from(count).unwrap_or(0);
94 }
95
96 self.recompute_prompt_tokens();
97 Ok(())
98 }
99
100 #[tracing::instrument(name = "core.persist.persist_message", skip_all, level = "debug")]
106 pub(crate) async fn persist_message(
107 &mut self,
108 role: Role,
109 content: &str,
110 parts: &[MessagePart],
111 has_injection_flags: bool,
112 ) {
113 let guard_event = self
117 .services
118 .security
119 .exfiltration_guard
120 .should_guard_memory_write(has_injection_flags);
121 if let Some(ref event) = guard_event {
122 tracing::warn!(
123 ?event,
124 "exfiltration guard: skipping Qdrant embedding for flagged content"
125 );
126 self.push_security_event(
127 zeph_common::SecurityEventCategory::ExfiltrationBlock,
128 "memory_write",
129 "Qdrant embedding skipped: flagged content",
130 );
131 }
132
133 let req = PersistMessageRequest::from_borrowed(role, content, parts, has_injection_flags);
134
135 let mut unsummarized = self.services.memory.persistence.unsummarized_count;
136 let memory_arc = self.services.memory.persistence.memory.clone();
137 let mut memory_view = MemoryPersistenceView {
138 memory: memory_arc.as_ref(),
139 conversation_id: self.services.memory.persistence.conversation_id,
140 autosave_assistant: self.services.memory.persistence.autosave_assistant,
141 autosave_min_length: self.services.memory.persistence.autosave_min_length,
142 unsummarized_count: &mut unsummarized,
143 goal_text: self.services.memory.extraction.goal_text.clone(),
144 };
145 let security = SecurityView {
146 guard_memory_writes: guard_event.is_some(),
147 _phantom: std::marker::PhantomData,
148 };
149 let mut sqlite_delta = 0u64;
150 let mut embed_delta = 0u64;
151 let mut guard_delta = 0u64;
152 let mut metrics_view = MetricsView {
153 sqlite_message_count: &mut sqlite_delta,
154 embeddings_generated: &mut embed_delta,
155 exfiltration_memory_guards: &mut guard_delta,
156 };
157
158 let svc = PersistenceService::new();
159 let outcome = svc
160 .persist_message(
161 req,
162 &mut self.msg.last_persisted_message_id,
163 &mut memory_view,
164 &security,
165 &mut metrics_view,
166 )
167 .await;
168
169 self.services.memory.persistence.unsummarized_count = unsummarized;
171
172 self.update_metrics(|m| {
174 m.sqlite_message_count += sqlite_delta;
175 m.embeddings_generated += embed_delta;
176 m.exfiltration_memory_guards += guard_delta;
178 });
179
180 if outcome.message_id.is_none() {
181 return;
182 }
183
184 self.enqueue_summarization_task();
188
189 let has_tool_result_parts = parts
192 .iter()
193 .any(|p| matches!(p, MessagePart::ToolResult { .. }));
194
195 self.enqueue_graph_extraction_task(content, has_injection_flags, has_tool_result_parts)
196 .await;
197
198 if role == Role::User && !has_tool_result_parts && !has_injection_flags {
200 self.enqueue_persona_extraction_task();
201 }
202
203 if has_tool_result_parts {
205 self.enqueue_trajectory_extraction_task();
206 }
207
208 let has_tool_use_parts = parts
213 .iter()
214 .any(|p| matches!(p, MessagePart::ToolUse { .. }));
215 if role == Role::Assistant && !has_tool_use_parts && !has_injection_flags {
216 self.enqueue_reasoning_extraction_task();
217 self.enqueue_memcot_distill_task(content);
219 }
220 }
221
222 fn enqueue_memcot_distill_task(&mut self, assistant_content: &str) {
227 let Some(accumulator) = &self.services.memory.extraction.memcot_accumulator else {
228 return;
229 };
230 let distill_provider_name = self
231 .services
232 .memory
233 .extraction
234 .memcot_config
235 .distill_provider
236 .as_str();
237 let provider = self.resolve_background_provider(distill_provider_name);
238
239 let content = assistant_content.to_owned();
240 let supervisor = &mut self.runtime.lifecycle.supervisor;
241
242 accumulator.maybe_enqueue_distill(&content, provider, |name, fut| {
243 supervisor.spawn(super::agent_supervisor::TaskClass::Enrichment, name, fut);
244 });
245 }
246
247 fn enqueue_summarization_task(&mut self) {
249 let (Some(memory), Some(cid)) = (
250 self.services.memory.persistence.memory.clone(),
251 self.services.memory.persistence.conversation_id,
252 ) else {
253 return;
254 };
255
256 if self.services.memory.persistence.unsummarized_count
257 <= self.services.memory.compaction.summarization_threshold
258 {
259 return;
260 }
261
262 let batch_size = self.services.memory.compaction.summarization_threshold / 2;
263
264 self.runtime.lifecycle.supervisor.spawn_summarization("summarization", async move {
265 match tokio::time::timeout(
266 std::time::Duration::from_secs(30),
267 memory.summarize(cid, batch_size),
268 )
269 .await
270 {
271 Ok(Ok(Some(summary_id))) => {
272 tracing::info!(
273 "background summarization: created summary {summary_id} for conversation {cid}"
274 );
275 true
276 }
277 Ok(Ok(None)) => {
278 tracing::debug!("background summarization: no summarization needed");
279 false
280 }
281 Ok(Err(e)) => {
282 tracing::error!("background summarization failed: {e:#}");
283 false
284 }
285 Err(_) => {
286 tracing::warn!("background summarization timed out after 30s");
287 false
288 }
289 }
290 });
291 }
292
293 #[tracing::instrument(
298 name = "core.persist.enqueue_graph_extraction",
299 skip_all,
300 level = "debug"
301 )]
302 async fn enqueue_graph_extraction_task(
303 &mut self,
304 content: &str,
305 has_injection_flags: bool,
306 has_tool_result_parts: bool,
307 ) {
308 if self.services.memory.persistence.memory.is_none()
309 || self.services.memory.persistence.conversation_id.is_none()
310 {
311 return;
312 }
313 if has_tool_result_parts {
314 tracing::debug!("graph extraction skipped: message contains ToolResult parts");
315 return;
316 }
317 if has_injection_flags {
318 tracing::warn!("graph extraction skipped: injection patterns detected in content");
319 return;
320 }
321
322 let cfg = &self.services.memory.extraction.graph_config;
323 if !cfg.enabled {
324 return;
325 }
326 let extraction_cfg = build_graph_extraction_config(
327 cfg,
328 self.services
329 .memory
330 .persistence
331 .conversation_id
332 .map(|c| c.0),
333 );
334 let extract_provider_name = cfg.extract_provider.as_str().to_owned();
337
338 if self.rpe_should_skip(content).await {
341 tracing::debug!("D-MEM RPE: low-surprise turn, skipping graph extraction");
342 return;
343 }
344
345 let context_messages = collect_context_messages(&self.msg.messages);
346
347 let Some(memory) = self.services.memory.persistence.memory.clone() else {
348 return;
349 };
350
351 let validator: zeph_memory::semantic::PostExtractValidator =
352 if self.services.security.memory_validator.is_enabled() {
353 let v = self.services.security.memory_validator.clone();
354 Some(Box::new(move |result| {
355 v.validate_graph_extraction(result)
356 .map_err(|e| e.to_string())
357 }))
358 } else {
359 None
360 };
361
362 let provider_override = if extract_provider_name.is_empty() {
363 None
364 } else {
365 Some(self.resolve_background_provider(&extract_provider_name))
366 };
367
368 self.spawn_graph_extraction_task(
369 memory,
370 content,
371 context_messages,
372 extraction_cfg,
373 validator,
374 provider_override,
375 );
376
377 self.sync_community_detection_failures();
379 self.sync_graph_extraction_metrics();
380 self.enqueue_graph_count_sync_task();
381 }
382
383 fn spawn_graph_extraction_task(
384 &mut self,
385 memory: std::sync::Arc<zeph_memory::semantic::SemanticMemory>,
386 content: &str,
387 context_messages: Vec<String>,
388 extraction_cfg: zeph_memory::semantic::GraphExtractionConfig,
389 validator: zeph_memory::semantic::PostExtractValidator,
390 provider_override: Option<zeph_llm::any::AnyProvider>,
391 ) {
392 let content_owned = content.to_owned();
393 let graph_store = memory.graph_store.clone();
394 let metrics_tx = self.runtime.metrics.metrics_tx.clone();
395 let start_time = self.runtime.lifecycle.start_time;
396
397 self.runtime.lifecycle.supervisor.spawn(
398 super::agent_supervisor::TaskClass::Enrichment,
399 "graph_extraction",
400 async move {
401 let extraction_handle = memory.spawn_graph_extraction(
402 content_owned,
403 context_messages,
404 extraction_cfg,
405 validator,
406 provider_override,
407 );
408
409 if let (Some(store), Some(tx)) = (graph_store, metrics_tx) {
411 let _ = extraction_handle.await;
412 let (entities, edges, communities) = tokio::join!(
413 store.entity_count(),
414 store.active_edge_count(),
415 store.community_count()
416 );
417 let elapsed = start_time.elapsed().as_secs();
418 tx.send_modify(|m| {
419 m.uptime_seconds = elapsed;
420 m.graph_entities_total = entities.unwrap_or(0).cast_unsigned();
421 m.graph_edges_total = edges.unwrap_or(0).cast_unsigned();
422 m.graph_communities_total = communities.unwrap_or(0).cast_unsigned();
423 });
424 } else {
425 let _ = extraction_handle.await;
426 }
427
428 tracing::debug!("background graph extraction complete");
429 },
430 );
431 }
432
433 fn enqueue_graph_count_sync_task(&mut self) {
435 let memory_for_sync = self.services.memory.persistence.memory.clone();
436 let metrics_tx_sync = self.runtime.metrics.metrics_tx.clone();
437 let start_time_sync = self.runtime.lifecycle.start_time;
438 let cid_sync = self.services.memory.persistence.conversation_id;
439 let graph_store_sync = memory_for_sync.as_ref().and_then(|m| m.graph_store.clone());
440 let sqlite_sync = memory_for_sync.as_ref().map(|m| m.sqlite().clone());
441 let guidelines_enabled = self.services.memory.extraction.graph_config.enabled;
442
443 self.runtime.lifecycle.supervisor.spawn(
444 super::agent_supervisor::TaskClass::Telemetry,
445 "graph_count_sync",
446 async move {
447 let Some(store) = graph_store_sync else {
448 return;
449 };
450 let Some(tx) = metrics_tx_sync else { return };
451
452 let (entities, edges, communities) = tokio::join!(
453 store.entity_count(),
454 store.active_edge_count(),
455 store.community_count()
456 );
457 let elapsed = start_time_sync.elapsed().as_secs();
458 tx.send_modify(|m| {
459 m.uptime_seconds = elapsed;
460 m.graph_entities_total = entities.unwrap_or(0).cast_unsigned();
461 m.graph_edges_total = edges.unwrap_or(0).cast_unsigned();
462 m.graph_communities_total = communities.unwrap_or(0).cast_unsigned();
463 });
464
465 if guidelines_enabled && let Some(sqlite) = sqlite_sync {
467 match tokio::time::timeout(
468 std::time::Duration::from_secs(10),
469 sqlite.load_compression_guidelines_meta(cid_sync),
470 )
471 .await
472 {
473 Ok(Ok((version, created_at))) => {
474 #[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
475 let version_u32 = u32::try_from(version).unwrap_or(0);
476 tx.send_modify(|m| {
477 m.guidelines_version = version_u32;
478 m.guidelines_updated_at = created_at;
479 });
480 }
481 Ok(Err(e)) => {
482 tracing::debug!("guidelines status sync failed: {e:#}");
483 }
484 Err(_) => {
485 tracing::debug!("guidelines status sync timed out");
486 }
487 }
488 }
489 },
490 );
491 }
492
493 fn enqueue_persona_extraction_task(&mut self) {
495 use zeph_memory::semantic::{PersonaExtractionConfig, extract_persona_facts};
496
497 let cfg = &self.services.memory.extraction.persona_config;
498 if !cfg.enabled {
499 return;
500 }
501
502 let Some(memory) = &self.services.memory.persistence.memory else {
503 return;
504 };
505
506 let user_messages: Vec<String> = self
507 .msg
508 .messages
509 .iter()
510 .filter(|m| {
511 m.role == Role::User
512 && !m
513 .parts
514 .iter()
515 .any(|p| matches!(p, MessagePart::ToolResult { .. }))
516 })
517 .take(8)
518 .map(|m| {
519 if m.content.len() > 2048 {
520 m.content[..m.content.floor_char_boundary(2048)].to_owned()
521 } else {
522 m.content.clone()
523 }
524 })
525 .collect();
526
527 if user_messages.len() < cfg.min_messages {
528 return;
529 }
530
531 let timeout_secs = cfg.extraction_timeout_secs;
532 let extraction_cfg = PersonaExtractionConfig {
533 enabled: cfg.enabled,
534 min_messages: cfg.min_messages,
535 max_messages: cfg.max_messages,
536 extraction_timeout_secs: timeout_secs,
537 };
538
539 let provider = self.resolve_background_provider(cfg.persona_provider.as_str());
540 let store = memory.sqlite().clone();
541 let conversation_id = self
542 .services
543 .memory
544 .persistence
545 .conversation_id
546 .map(|c| c.0);
547
548 self.runtime.lifecycle.supervisor.spawn(
549 super::agent_supervisor::TaskClass::Enrichment,
550 "persona_extraction",
551 async move {
552 let user_message_refs: Vec<&str> =
553 user_messages.iter().map(String::as_str).collect();
554 let fut = extract_persona_facts(
555 &store,
556 &provider,
557 &user_message_refs,
558 &extraction_cfg,
559 conversation_id,
560 );
561 match tokio::time::timeout(std::time::Duration::from_secs(timeout_secs), fut).await
562 {
563 Ok(Ok(n)) => tracing::debug!(upserted = n, "persona extraction complete"),
564 Ok(Err(e)) => tracing::warn!(error = %e, "persona extraction failed"),
565 Err(_) => tracing::warn!(
566 timeout_secs,
567 "persona extraction timed out — no facts written this turn"
568 ),
569 }
570 },
571 );
572 }
573
574 fn enqueue_trajectory_extraction_task(&mut self) {
576 use zeph_memory::semantic::{TrajectoryExtractionConfig, extract_trajectory_entries};
577
578 let cfg = self.services.memory.extraction.trajectory_config.clone();
579 if !cfg.enabled {
580 return;
581 }
582
583 let Some(memory) = &self.services.memory.persistence.memory else {
584 return;
585 };
586
587 let conversation_id = match self.services.memory.persistence.conversation_id {
588 Some(cid) => cid.0,
589 None => return,
590 };
591
592 let tail_start = self.msg.messages.len().saturating_sub(cfg.max_messages);
593 let turn_messages: Vec<zeph_llm::provider::Message> =
594 self.msg.messages[tail_start..].to_vec();
595
596 if turn_messages.is_empty() {
597 return;
598 }
599
600 let extraction_cfg = TrajectoryExtractionConfig {
601 enabled: cfg.enabled,
602 max_messages: cfg.max_messages,
603 extraction_timeout_secs: cfg.extraction_timeout_secs,
604 };
605
606 let provider = self.resolve_background_provider(cfg.trajectory_provider.as_str());
607 let store = memory.sqlite().clone();
608 let min_confidence = cfg.min_confidence;
609
610 self.runtime.lifecycle.supervisor.spawn(
611 super::agent_supervisor::TaskClass::Enrichment,
612 "trajectory_extraction",
613 async move {
614 let entries =
615 match extract_trajectory_entries(&provider, &turn_messages, &extraction_cfg)
616 .await
617 {
618 Ok(e) => e,
619 Err(e) => {
620 tracing::warn!(error = %e, "trajectory extraction failed");
621 return;
622 }
623 };
624
625 let last_id = store
626 .trajectory_last_extracted_message_id(conversation_id)
627 .await
628 .unwrap_or(0);
629
630 let mut max_id = last_id;
631 for entry in &entries {
632 if entry.confidence < min_confidence {
633 continue;
634 }
635 let tools_json = serde_json::to_string(&entry.tools_used)
636 .unwrap_or_else(|_| "[]".to_string());
637 match store
638 .insert_trajectory_entry(zeph_memory::NewTrajectoryEntry {
639 conversation_id: Some(conversation_id),
640 turn_index: 0,
641 kind: &entry.kind,
642 intent: &entry.intent,
643 outcome: &entry.outcome,
644 tools_used: &tools_json,
645 confidence: entry.confidence,
646 })
647 .await
648 {
649 Ok(id) => {
650 if id > max_id {
651 max_id = id;
652 }
653 }
654 Err(e) => tracing::warn!(error = %e, "failed to insert trajectory entry"),
655 }
656 }
657
658 if max_id > last_id {
659 let _ = store
660 .set_trajectory_last_extracted_message_id(conversation_id, max_id)
661 .await;
662 }
663
664 tracing::debug!(
665 count = entries.len(),
666 conversation_id,
667 "trajectory extraction complete"
668 );
669 },
670 );
671 }
672
673 fn enqueue_reasoning_extraction_task(&mut self) {
678 let cfg = self.services.memory.extraction.reasoning_config.clone();
679 if !cfg.enabled {
680 return;
681 }
682
683 let Some(memory) = &self.services.memory.persistence.memory else {
684 return;
685 };
686
687 let Some(reasoning) = memory.reasoning.clone() else {
688 return;
689 };
690
691 let tail_start = self.msg.messages.len().saturating_sub(cfg.max_messages);
692 let turn_messages: Vec<zeph_llm::provider::Message> =
693 self.msg.messages[tail_start..].to_vec();
694
695 if turn_messages.len() < cfg.min_messages {
696 return;
697 }
698
699 let extract_provider = self.resolve_background_provider(cfg.extract_provider.as_str());
700 let distill_provider = self.resolve_background_provider(cfg.distill_provider.as_str());
701 let embed_provider = memory.effective_embed_provider().clone();
702 let store_limit = cfg.store_limit;
703 let extraction_timeout = std::time::Duration::from_secs(cfg.extraction_timeout_secs);
704 let distill_timeout = std::time::Duration::from_secs(cfg.distill_timeout_secs);
705 let self_judge_window = cfg.self_judge_window;
706 let min_assistant_chars = cfg.min_assistant_chars;
707
708 self.runtime.lifecycle.supervisor.spawn(
709 super::agent_supervisor::TaskClass::Enrichment,
710 "reasoning_extraction",
711 async move {
712 if let Err(e) = zeph_memory::process_reasoning_turn(
713 &reasoning,
714 &extract_provider,
715 &distill_provider,
716 &embed_provider,
717 &turn_messages,
718 zeph_memory::ProcessTurnConfig {
719 store_limit,
720 extraction_timeout,
721 distill_timeout,
722 self_judge_window,
723 min_assistant_chars,
724 },
725 )
726 .await
727 {
728 tracing::warn!(error = %e, "reasoning: process_turn failed");
729 }
730
731 tracing::debug!("reasoning extraction complete");
732 },
733 );
734 }
735
736 async fn rpe_should_skip(&mut self, content: &str) -> bool {
741 let Some(ref rpe_mutex) = self.services.memory.extraction.rpe_router else {
742 return false;
743 };
744 let Some(memory) = &self.services.memory.persistence.memory else {
745 return false;
746 };
747 let candidates = zeph_memory::extract_candidate_entities(content);
748 let provider = memory.provider();
749 let Ok(Ok(emb_vec)) =
750 tokio::time::timeout(std::time::Duration::from_secs(5), provider.embed(content)).await
751 else {
752 return false; };
754 if let Ok(mut router) = rpe_mutex.lock() {
755 let signal = router.compute(&emb_vec, &candidates);
756 router.push_embedding(emb_vec);
757 router.push_entities(&candidates);
758 !signal.should_extract
759 } else {
760 tracing::warn!("rpe_router mutex poisoned; falling through to extract");
761 false
762 }
763 }
764}
765
766#[cfg(test)]
767mod tests {
768 use super::super::agent_tests::{
769 MetricsSnapshot, MockChannel, MockToolExecutor, create_test_registry, mock_provider,
770 };
771 use super::*;
772 use zeph_llm::any::AnyProvider;
773 use zeph_llm::provider::Message;
774 use zeph_memory::semantic::SemanticMemory;
775
776 async fn test_memory(provider: &AnyProvider) -> SemanticMemory {
777 SemanticMemory::new(
778 ":memory:",
779 "http://127.0.0.1:1",
780 None,
781 provider.clone(),
782 "test-model",
783 )
784 .await
785 .unwrap()
786 }
787
788 #[tokio::test]
789 async fn load_history_without_memory_returns_ok() {
790 let provider = mock_provider(vec![]);
791 let channel = MockChannel::new(vec![]);
792 let registry = create_test_registry();
793 let executor = MockToolExecutor::no_tools();
794 let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
795
796 let result = agent.load_history().await;
797 assert!(result.is_ok());
798 assert_eq!(agent.msg.messages.len(), 1); }
801
802 #[tokio::test]
803 async fn load_history_with_messages_injects_into_agent() {
804 let provider = mock_provider(vec![]);
805 let channel = MockChannel::new(vec![]);
806 let registry = create_test_registry();
807 let executor = MockToolExecutor::no_tools();
808
809 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
810 let cid = memory.sqlite().create_conversation().await.unwrap();
811
812 memory
813 .sqlite()
814 .save_message(cid, "user", "hello from history")
815 .await
816 .unwrap();
817 memory
818 .sqlite()
819 .save_message(cid, "assistant", "hi back")
820 .await
821 .unwrap();
822
823 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
824 std::sync::Arc::new(memory),
825 cid,
826 50,
827 5,
828 100,
829 );
830
831 let messages_before = agent.msg.messages.len();
832 agent.load_history().await.unwrap();
833 assert_eq!(agent.msg.messages.len(), messages_before + 2);
835 }
836
837 #[tokio::test]
838 async fn load_history_skips_empty_messages() {
839 let provider = mock_provider(vec![]);
840 let channel = MockChannel::new(vec![]);
841 let registry = create_test_registry();
842 let executor = MockToolExecutor::no_tools();
843
844 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
845 let cid = memory.sqlite().create_conversation().await.unwrap();
846
847 memory
849 .sqlite()
850 .save_message(cid, "user", " ")
851 .await
852 .unwrap();
853 memory
854 .sqlite()
855 .save_message(cid, "user", "real message")
856 .await
857 .unwrap();
858
859 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
860 std::sync::Arc::new(memory),
861 cid,
862 50,
863 5,
864 100,
865 );
866
867 let messages_before = agent.msg.messages.len();
868 agent.load_history().await.unwrap();
869 assert_eq!(agent.msg.messages.len(), messages_before + 1);
871 }
872
873 #[tokio::test]
874 async fn load_history_with_empty_store_returns_ok() {
875 let provider = mock_provider(vec![]);
876 let channel = MockChannel::new(vec![]);
877 let registry = create_test_registry();
878 let executor = MockToolExecutor::no_tools();
879
880 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
881 let cid = memory.sqlite().create_conversation().await.unwrap();
882
883 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
884 std::sync::Arc::new(memory),
885 cid,
886 50,
887 5,
888 100,
889 );
890
891 let messages_before = agent.msg.messages.len();
892 agent.load_history().await.unwrap();
893 assert_eq!(agent.msg.messages.len(), messages_before);
895 }
896
897 #[tokio::test]
898 async fn load_history_increments_session_count_for_existing_messages() {
899 let provider = mock_provider(vec![]);
900 let channel = MockChannel::new(vec![]);
901 let registry = create_test_registry();
902 let executor = MockToolExecutor::no_tools();
903
904 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
905 let cid = memory.sqlite().create_conversation().await.unwrap();
906
907 let id1 = memory
909 .sqlite()
910 .save_message(cid, "user", "hello")
911 .await
912 .unwrap();
913 let id2 = memory
914 .sqlite()
915 .save_message(cid, "assistant", "hi")
916 .await
917 .unwrap();
918
919 let memory_arc = std::sync::Arc::new(memory);
920 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
921 memory_arc.clone(),
922 cid,
923 50,
924 5,
925 100,
926 );
927
928 agent.load_history().await.unwrap();
929
930 let counts: Vec<i64> = zeph_db::query_scalar(
932 "SELECT session_count FROM messages WHERE id IN (?, ?) ORDER BY id",
933 )
934 .bind(id1)
935 .bind(id2)
936 .fetch_all(memory_arc.sqlite().pool())
937 .await
938 .unwrap();
939 assert_eq!(
940 counts,
941 vec![1, 1],
942 "session_count must be 1 after first restore"
943 );
944 }
945
946 #[tokio::test]
947 async fn load_history_does_not_increment_session_count_for_new_conversation() {
948 let provider = mock_provider(vec![]);
949 let channel = MockChannel::new(vec![]);
950 let registry = create_test_registry();
951 let executor = MockToolExecutor::no_tools();
952
953 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
954 let cid = memory.sqlite().create_conversation().await.unwrap();
955
956 let memory_arc = std::sync::Arc::new(memory);
958 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
959 memory_arc.clone(),
960 cid,
961 50,
962 5,
963 100,
964 );
965
966 agent.load_history().await.unwrap();
967
968 let counts: Vec<i64> =
970 zeph_db::query_scalar("SELECT session_count FROM messages WHERE conversation_id = ?")
971 .bind(cid)
972 .fetch_all(memory_arc.sqlite().pool())
973 .await
974 .unwrap();
975 assert!(counts.is_empty(), "new conversation must have no messages");
976 }
977
978 #[tokio::test]
979 async fn persist_message_without_memory_silently_returns() {
980 let provider = mock_provider(vec![]);
982 let channel = MockChannel::new(vec![]);
983 let registry = create_test_registry();
984 let executor = MockToolExecutor::no_tools();
985 let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
986
987 agent.persist_message(Role::User, "hello", &[], false).await;
989 }
990
991 #[tokio::test]
992 async fn persist_message_assistant_autosave_false_uses_save_only() {
993 let provider = mock_provider(vec![]);
994 let channel = MockChannel::new(vec![]);
995 let registry = create_test_registry();
996 let executor = MockToolExecutor::no_tools();
997
998 let (tx, rx) = tokio::sync::watch::channel(MetricsSnapshot::default());
999 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1000 let cid = memory.sqlite().create_conversation().await.unwrap();
1001
1002 let mut agent = Agent::new(provider, channel, registry, None, 5, executor)
1003 .with_metrics(tx)
1004 .with_memory(std::sync::Arc::new(memory), cid, 50, 5, 100);
1005 agent.services.memory.persistence.autosave_assistant = false;
1006 agent.services.memory.persistence.autosave_min_length = 20;
1007
1008 agent
1009 .persist_message(Role::Assistant, "short assistant reply", &[], false)
1010 .await;
1011
1012 let history = agent
1013 .services
1014 .memory
1015 .persistence
1016 .memory
1017 .as_ref()
1018 .unwrap()
1019 .sqlite()
1020 .load_history(cid, 50)
1021 .await
1022 .unwrap();
1023 assert_eq!(history.len(), 1, "message must be saved");
1024 assert_eq!(history[0].content, "short assistant reply");
1025 assert_eq!(rx.borrow().embeddings_generated, 0);
1027 }
1028
1029 #[tokio::test]
1030 async fn persist_message_assistant_below_min_length_uses_save_only() {
1031 let provider = mock_provider(vec![]);
1032 let channel = MockChannel::new(vec![]);
1033 let registry = create_test_registry();
1034 let executor = MockToolExecutor::no_tools();
1035
1036 let (tx, rx) = tokio::sync::watch::channel(MetricsSnapshot::default());
1037 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1038 let cid = memory.sqlite().create_conversation().await.unwrap();
1039
1040 let mut agent = Agent::new(provider, channel, registry, None, 5, executor)
1042 .with_metrics(tx)
1043 .with_memory(std::sync::Arc::new(memory), cid, 50, 5, 100);
1044 agent.services.memory.persistence.autosave_assistant = true;
1045 agent.services.memory.persistence.autosave_min_length = 1000;
1046
1047 agent
1048 .persist_message(Role::Assistant, "too short", &[], false)
1049 .await;
1050
1051 let history = agent
1052 .services
1053 .memory
1054 .persistence
1055 .memory
1056 .as_ref()
1057 .unwrap()
1058 .sqlite()
1059 .load_history(cid, 50)
1060 .await
1061 .unwrap();
1062 assert_eq!(history.len(), 1, "message must be saved");
1063 assert_eq!(history[0].content, "too short");
1064 assert_eq!(rx.borrow().embeddings_generated, 0);
1065 }
1066
1067 #[tokio::test]
1068 async fn persist_message_assistant_at_min_length_boundary_uses_embed() {
1069 let provider = mock_provider(vec![]);
1071 let channel = MockChannel::new(vec![]);
1072 let registry = create_test_registry();
1073 let executor = MockToolExecutor::no_tools();
1074
1075 let (tx, rx) = tokio::sync::watch::channel(MetricsSnapshot::default());
1076 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1077 let cid = memory.sqlite().create_conversation().await.unwrap();
1078
1079 let min_length = 10usize;
1080 let mut agent = Agent::new(provider, channel, registry, None, 5, executor)
1081 .with_metrics(tx)
1082 .with_memory(std::sync::Arc::new(memory), cid, 50, 5, 100);
1083 agent.services.memory.persistence.autosave_assistant = true;
1084 agent.services.memory.persistence.autosave_min_length = min_length;
1085
1086 let content_at_boundary = "A".repeat(min_length);
1088 assert_eq!(content_at_boundary.len(), min_length);
1089 agent
1090 .persist_message(Role::Assistant, &content_at_boundary, &[], false)
1091 .await;
1092
1093 assert_eq!(rx.borrow().sqlite_message_count, 1);
1095 }
1096
1097 #[tokio::test]
1098 async fn persist_message_assistant_one_below_min_length_uses_save_only() {
1099 let provider = mock_provider(vec![]);
1101 let channel = MockChannel::new(vec![]);
1102 let registry = create_test_registry();
1103 let executor = MockToolExecutor::no_tools();
1104
1105 let (tx, rx) = tokio::sync::watch::channel(MetricsSnapshot::default());
1106 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1107 let cid = memory.sqlite().create_conversation().await.unwrap();
1108
1109 let min_length = 10usize;
1110 let mut agent = Agent::new(provider, channel, registry, None, 5, executor)
1111 .with_metrics(tx)
1112 .with_memory(std::sync::Arc::new(memory), cid, 50, 5, 100);
1113 agent.services.memory.persistence.autosave_assistant = true;
1114 agent.services.memory.persistence.autosave_min_length = min_length;
1115
1116 let content_below_boundary = "A".repeat(min_length - 1);
1118 assert_eq!(content_below_boundary.len(), min_length - 1);
1119 agent
1120 .persist_message(Role::Assistant, &content_below_boundary, &[], false)
1121 .await;
1122
1123 let history = agent
1124 .services
1125 .memory
1126 .persistence
1127 .memory
1128 .as_ref()
1129 .unwrap()
1130 .sqlite()
1131 .load_history(cid, 50)
1132 .await
1133 .unwrap();
1134 assert_eq!(history.len(), 1, "message must still be saved");
1135 assert_eq!(rx.borrow().embeddings_generated, 0);
1137 }
1138
1139 #[tokio::test]
1140 async fn persist_message_increments_unsummarized_count() {
1141 let provider = mock_provider(vec![]);
1142 let channel = MockChannel::new(vec![]);
1143 let registry = create_test_registry();
1144 let executor = MockToolExecutor::no_tools();
1145
1146 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1147 let cid = memory.sqlite().create_conversation().await.unwrap();
1148
1149 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
1151 std::sync::Arc::new(memory),
1152 cid,
1153 50,
1154 5,
1155 100,
1156 );
1157
1158 assert_eq!(agent.services.memory.persistence.unsummarized_count, 0);
1159
1160 agent.persist_message(Role::User, "first", &[], false).await;
1161 assert_eq!(agent.services.memory.persistence.unsummarized_count, 1);
1162
1163 agent
1164 .persist_message(Role::User, "second", &[], false)
1165 .await;
1166 assert_eq!(agent.services.memory.persistence.unsummarized_count, 2);
1167 }
1168
1169 #[tokio::test]
1170 async fn check_summarization_resets_counter_on_success() {
1171 let provider = mock_provider(vec![]);
1172 let channel = MockChannel::new(vec![]);
1173 let registry = create_test_registry();
1174 let executor = MockToolExecutor::no_tools();
1175
1176 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1177 let cid = memory.sqlite().create_conversation().await.unwrap();
1178
1179 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
1181 std::sync::Arc::new(memory),
1182 cid,
1183 50,
1184 5,
1185 1,
1186 );
1187
1188 agent.persist_message(Role::User, "msg1", &[], false).await;
1189 agent.persist_message(Role::User, "msg2", &[], false).await;
1190
1191 assert!(agent.services.memory.persistence.unsummarized_count <= 2);
1196 }
1197
1198 #[tokio::test]
1199 async fn unsummarized_count_not_incremented_without_memory() {
1200 let provider = mock_provider(vec![]);
1201 let channel = MockChannel::new(vec![]);
1202 let registry = create_test_registry();
1203 let executor = MockToolExecutor::no_tools();
1204 let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
1205
1206 agent.persist_message(Role::User, "hello", &[], false).await;
1207 assert_eq!(agent.services.memory.persistence.unsummarized_count, 0);
1209 }
1210
1211 mod graph_extraction_guards {
1213 use super::*;
1214 use crate::config::GraphConfig;
1215 use zeph_llm::provider::MessageMetadata;
1216 use zeph_memory::graph::GraphStore;
1217
1218 fn enabled_graph_config() -> GraphConfig {
1219 GraphConfig {
1220 enabled: true,
1221 ..GraphConfig::default()
1222 }
1223 }
1224
1225 async fn agent_with_graph(
1226 provider: &AnyProvider,
1227 config: GraphConfig,
1228 ) -> Agent<MockChannel> {
1229 let memory =
1230 test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1231 let cid = memory.sqlite().create_conversation().await.unwrap();
1232 Agent::new(
1233 provider.clone(),
1234 MockChannel::new(vec![]),
1235 create_test_registry(),
1236 None,
1237 5,
1238 MockToolExecutor::no_tools(),
1239 )
1240 .with_memory(std::sync::Arc::new(memory), cid, 50, 5, 100)
1241 .with_graph_config(config)
1242 }
1243
1244 #[tokio::test]
1245 async fn injection_flag_guard_skips_extraction() {
1246 let provider = mock_provider(vec![]);
1248 let mut agent = agent_with_graph(&provider, enabled_graph_config()).await;
1249 let pool = agent
1250 .services
1251 .memory
1252 .persistence
1253 .memory
1254 .as_ref()
1255 .unwrap()
1256 .sqlite()
1257 .pool()
1258 .clone();
1259
1260 agent
1261 .enqueue_graph_extraction_task("I use Rust", true, false)
1262 .await;
1263
1264 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1266
1267 let store = GraphStore::new(pool);
1268 let count = store.get_metadata("extraction_count").await.unwrap();
1269 assert!(
1270 count.is_none(),
1271 "injection flag must prevent extraction_count from being written"
1272 );
1273 }
1274
1275 #[tokio::test]
1276 async fn disabled_config_guard_skips_extraction() {
1277 let provider = mock_provider(vec![]);
1279 let disabled_cfg = GraphConfig {
1280 enabled: false,
1281 ..GraphConfig::default()
1282 };
1283 let mut agent = agent_with_graph(&provider, disabled_cfg).await;
1284 let pool = agent
1285 .services
1286 .memory
1287 .persistence
1288 .memory
1289 .as_ref()
1290 .unwrap()
1291 .sqlite()
1292 .pool()
1293 .clone();
1294
1295 agent
1296 .enqueue_graph_extraction_task("I use Rust", false, false)
1297 .await;
1298
1299 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1300
1301 let store = GraphStore::new(pool);
1302 let count = store.get_metadata("extraction_count").await.unwrap();
1303 assert!(
1304 count.is_none(),
1305 "disabled graph config must prevent extraction"
1306 );
1307 }
1308
1309 #[tokio::test]
1310 async fn happy_path_fires_extraction() {
1311 let provider = mock_provider(vec![]);
1314 let mut agent = agent_with_graph(&provider, enabled_graph_config()).await;
1315 let pool = agent
1316 .services
1317 .memory
1318 .persistence
1319 .memory
1320 .as_ref()
1321 .unwrap()
1322 .sqlite()
1323 .pool()
1324 .clone();
1325
1326 agent
1327 .enqueue_graph_extraction_task("I use Rust for systems programming", false, false)
1328 .await;
1329
1330 tokio::time::sleep(std::time::Duration::from_millis(200)).await;
1332
1333 let store = GraphStore::new(pool);
1334 let count = store.get_metadata("extraction_count").await.unwrap();
1335 assert!(
1336 count.is_some(),
1337 "happy-path extraction must increment extraction_count"
1338 );
1339 }
1340
1341 #[tokio::test]
1342 async fn tool_result_parts_guard_skips_extraction() {
1343 let provider = mock_provider(vec![]);
1347 let mut agent = agent_with_graph(&provider, enabled_graph_config()).await;
1348 let pool = agent
1349 .services
1350 .memory
1351 .persistence
1352 .memory
1353 .as_ref()
1354 .unwrap()
1355 .sqlite()
1356 .pool()
1357 .clone();
1358
1359 agent
1360 .enqueue_graph_extraction_task(
1361 "[tool_result: abc123]\nprovider_type = \"claude\"\nallowed_commands = []",
1362 false,
1363 true, )
1365 .await;
1366
1367 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1368
1369 let store = GraphStore::new(pool);
1370 let count = store.get_metadata("extraction_count").await.unwrap();
1371 assert!(
1372 count.is_none(),
1373 "tool result message must not trigger graph extraction"
1374 );
1375 }
1376
1377 #[tokio::test]
1378 async fn context_filter_excludes_tool_result_messages() {
1379 let provider = mock_provider(vec![]);
1390 let mut agent = agent_with_graph(&provider, enabled_graph_config()).await;
1391
1392 agent.msg.messages.push(Message {
1395 role: Role::User,
1396 content: "[tool_result: abc]\nprovider_type = \"openai\"".to_owned(),
1397 parts: vec![MessagePart::ToolResult {
1398 tool_use_id: "abc".to_owned(),
1399 content: "provider_type = \"openai\"".to_owned(),
1400 is_error: false,
1401 }],
1402 metadata: MessageMetadata::default(),
1403 });
1404
1405 let pool = agent
1406 .services
1407 .memory
1408 .persistence
1409 .memory
1410 .as_ref()
1411 .unwrap()
1412 .sqlite()
1413 .pool()
1414 .clone();
1415
1416 agent
1418 .enqueue_graph_extraction_task(
1419 "I prefer Rust for systems programming",
1420 false,
1421 false,
1422 )
1423 .await;
1424
1425 tokio::time::sleep(std::time::Duration::from_millis(200)).await;
1426
1427 let store = GraphStore::new(pool);
1429 let count = store.get_metadata("extraction_count").await.unwrap();
1430 assert!(
1431 count.is_some(),
1432 "conversational message must trigger extraction even with prior tool result in history"
1433 );
1434 }
1435 }
1436
1437 mod persona_extraction_guards {
1439 use super::*;
1440 use zeph_config::PersonaConfig;
1441 use zeph_llm::provider::MessageMetadata;
1442
1443 fn enabled_persona_config() -> PersonaConfig {
1444 PersonaConfig {
1445 enabled: true,
1446 min_messages: 1,
1447 ..PersonaConfig::default()
1448 }
1449 }
1450
1451 async fn agent_with_persona(
1452 provider: &AnyProvider,
1453 config: PersonaConfig,
1454 ) -> Agent<MockChannel> {
1455 let memory =
1456 test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1457 let cid = memory.sqlite().create_conversation().await.unwrap();
1458 let mut agent = Agent::new(
1459 provider.clone(),
1460 MockChannel::new(vec![]),
1461 create_test_registry(),
1462 None,
1463 5,
1464 MockToolExecutor::no_tools(),
1465 )
1466 .with_memory(std::sync::Arc::new(memory), cid, 50, 5, 100);
1467 agent.services.memory.extraction.persona_config = config;
1468 agent
1469 }
1470
1471 #[tokio::test]
1472 async fn disabled_config_skips_spawn() {
1473 let provider = mock_provider(vec![]);
1475 let mut agent = agent_with_persona(
1476 &provider,
1477 PersonaConfig {
1478 enabled: false,
1479 ..PersonaConfig::default()
1480 },
1481 )
1482 .await;
1483
1484 agent.msg.messages.push(zeph_llm::provider::Message {
1486 role: Role::User,
1487 content: "I prefer Rust for systems programming".to_owned(),
1488 parts: vec![],
1489 metadata: MessageMetadata::default(),
1490 });
1491
1492 agent.enqueue_persona_extraction_task();
1493
1494 let store = agent
1495 .services
1496 .memory
1497 .persistence
1498 .memory
1499 .as_ref()
1500 .unwrap()
1501 .sqlite()
1502 .clone();
1503 let count = store.count_persona_facts().await.unwrap();
1504 assert_eq!(count, 0, "disabled persona config must not write any facts");
1505 }
1506
1507 #[tokio::test]
1508 async fn below_min_messages_skips_spawn() {
1509 let provider = mock_provider(vec![]);
1511 let mut agent = agent_with_persona(
1512 &provider,
1513 PersonaConfig {
1514 enabled: true,
1515 min_messages: 3,
1516 ..PersonaConfig::default()
1517 },
1518 )
1519 .await;
1520
1521 for text in ["I use Rust", "I prefer async code"] {
1522 agent.msg.messages.push(zeph_llm::provider::Message {
1523 role: Role::User,
1524 content: text.to_owned(),
1525 parts: vec![],
1526 metadata: MessageMetadata::default(),
1527 });
1528 }
1529
1530 agent.enqueue_persona_extraction_task();
1531
1532 let store = agent
1533 .services
1534 .memory
1535 .persistence
1536 .memory
1537 .as_ref()
1538 .unwrap()
1539 .sqlite()
1540 .clone();
1541 let count = store.count_persona_facts().await.unwrap();
1542 assert_eq!(
1543 count, 0,
1544 "below min_messages threshold must not trigger extraction"
1545 );
1546 }
1547
1548 #[tokio::test]
1549 async fn no_memory_skips_spawn() {
1550 let provider = mock_provider(vec![]);
1552 let channel = MockChannel::new(vec![]);
1553 let registry = create_test_registry();
1554 let executor = MockToolExecutor::no_tools();
1555 let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
1556 agent.services.memory.extraction.persona_config = enabled_persona_config();
1557 agent.msg.messages.push(zeph_llm::provider::Message {
1558 role: Role::User,
1559 content: "I like Rust".to_owned(),
1560 parts: vec![],
1561 metadata: MessageMetadata::default(),
1562 });
1563
1564 agent.enqueue_persona_extraction_task();
1566 }
1567
1568 #[tokio::test]
1569 async fn enabled_enough_messages_spawns_extraction() {
1570 use zeph_llm::mock::MockProvider;
1573 let (mock, recorded) = MockProvider::default().with_recording();
1574 let provider = AnyProvider::Mock(mock);
1575 let mut agent = agent_with_persona(&provider, enabled_persona_config()).await;
1576
1577 agent.msg.messages.push(zeph_llm::provider::Message {
1578 role: Role::User,
1579 content: "I prefer Rust for systems programming".to_owned(),
1580 parts: vec![],
1581 metadata: MessageMetadata::default(),
1582 });
1583
1584 agent.enqueue_persona_extraction_task();
1585
1586 agent.runtime.lifecycle.supervisor.join_all_for_test().await;
1588
1589 let calls = recorded.lock().unwrap();
1590 assert!(
1591 !calls.is_empty(),
1592 "happy-path: provider.chat() must be called when extraction completes"
1593 );
1594 }
1595
1596 #[tokio::test]
1597 async fn messages_capped_at_eight() {
1598 use zeph_llm::mock::MockProvider;
1601 let (mock, recorded) = MockProvider::default().with_recording();
1602 let provider = AnyProvider::Mock(mock);
1603 let mut agent = agent_with_persona(&provider, enabled_persona_config()).await;
1604
1605 for i in 0..12u32 {
1606 agent.msg.messages.push(zeph_llm::provider::Message {
1607 role: Role::User,
1608 content: format!("I like message {i}"),
1609 parts: vec![],
1610 metadata: MessageMetadata::default(),
1611 });
1612 }
1613
1614 agent.enqueue_persona_extraction_task();
1615
1616 agent.runtime.lifecycle.supervisor.join_all_for_test().await;
1618
1619 let calls = recorded.lock().unwrap();
1621 assert!(
1622 !calls.is_empty(),
1623 "extraction must run when enough messages present"
1624 );
1625 let prompt = &calls[0];
1627 let user_text = prompt
1628 .iter()
1629 .filter(|m| m.role == Role::User)
1630 .map(|m| m.content.as_str())
1631 .collect::<Vec<_>>()
1632 .join(" ");
1633 assert!(
1635 !user_text.contains("I like message 8"),
1636 "message index 8 must be excluded from extraction input"
1637 );
1638 }
1639
1640 #[test]
1641 fn long_message_truncated_at_char_boundary() {
1642 let long_content = "x".repeat(3000);
1646 let truncated = if long_content.len() > 2048 {
1647 long_content[..long_content.floor_char_boundary(2048)].to_owned()
1648 } else {
1649 long_content.clone()
1650 };
1651 assert_eq!(
1652 truncated.len(),
1653 2048,
1654 "ASCII content must be truncated to exactly 2048 bytes"
1655 );
1656
1657 let multi = "é".repeat(1500); let truncated_multi = if multi.len() > 2048 {
1660 multi[..multi.floor_char_boundary(2048)].to_owned()
1661 } else {
1662 multi.clone()
1663 };
1664 assert!(
1665 truncated_multi.len() <= 2048,
1666 "multi-byte content must not exceed 2048 bytes"
1667 );
1668 assert!(truncated_multi.is_char_boundary(truncated_multi.len()));
1669 }
1670 }
1671
1672 #[tokio::test]
1673 async fn persist_message_user_always_embeds_regardless_of_autosave_flag() {
1674 let provider = mock_provider(vec![]);
1675 let channel = MockChannel::new(vec![]);
1676 let registry = create_test_registry();
1677 let executor = MockToolExecutor::no_tools();
1678
1679 let (tx, rx) = tokio::sync::watch::channel(MetricsSnapshot::default());
1680 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1681 let cid = memory.sqlite().create_conversation().await.unwrap();
1682
1683 let mut agent = Agent::new(provider, channel, registry, None, 5, executor)
1685 .with_metrics(tx)
1686 .with_memory(std::sync::Arc::new(memory), cid, 50, 5, 100);
1687 agent.services.memory.persistence.autosave_assistant = false;
1688 agent.services.memory.persistence.autosave_min_length = 20;
1689
1690 let long_user_msg = "A".repeat(100);
1691 agent
1692 .persist_message(Role::User, &long_user_msg, &[], false)
1693 .await;
1694
1695 let history = agent
1696 .services
1697 .memory
1698 .persistence
1699 .memory
1700 .as_ref()
1701 .unwrap()
1702 .sqlite()
1703 .load_history(cid, 50)
1704 .await
1705 .unwrap();
1706 assert_eq!(history.len(), 1, "user message must be saved");
1707 assert_eq!(rx.borrow().sqlite_message_count, 1);
1710 }
1711
1712 #[tokio::test]
1716 async fn persist_message_saves_correct_tool_use_parts() {
1717 use zeph_llm::provider::MessagePart;
1718
1719 let provider = mock_provider(vec![]);
1720 let channel = MockChannel::new(vec![]);
1721 let registry = create_test_registry();
1722 let executor = MockToolExecutor::no_tools();
1723
1724 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1725 let cid = memory.sqlite().create_conversation().await.unwrap();
1726
1727 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
1728 std::sync::Arc::new(memory),
1729 cid,
1730 50,
1731 5,
1732 100,
1733 );
1734
1735 let parts = vec![MessagePart::ToolUse {
1736 id: "call_abc123".to_string(),
1737 name: "read_file".to_string(),
1738 input: serde_json::json!({"path": "/tmp/test.txt"}),
1739 }];
1740 let content = "[tool_use: read_file(call_abc123)]";
1741
1742 agent
1743 .persist_message(Role::Assistant, content, &parts, false)
1744 .await;
1745
1746 let history = agent
1747 .services
1748 .memory
1749 .persistence
1750 .memory
1751 .as_ref()
1752 .unwrap()
1753 .sqlite()
1754 .load_history(cid, 50)
1755 .await
1756 .unwrap();
1757
1758 assert_eq!(history.len(), 1);
1759 assert_eq!(history[0].role, Role::Assistant);
1760 assert_eq!(history[0].content, content);
1761 assert_eq!(history[0].parts.len(), 1);
1762 match &history[0].parts[0] {
1763 MessagePart::ToolUse { id, name, .. } => {
1764 assert_eq!(id, "call_abc123");
1765 assert_eq!(name, "read_file");
1766 }
1767 other => panic!("expected ToolUse part, got {other:?}"),
1768 }
1769 assert!(
1771 !history[0]
1772 .parts
1773 .iter()
1774 .any(|p| matches!(p, MessagePart::ToolResult { .. })),
1775 "assistant message must not contain ToolResult parts"
1776 );
1777 }
1778
1779 #[tokio::test]
1780 async fn persist_message_saves_correct_tool_result_parts() {
1781 use zeph_llm::provider::MessagePart;
1782
1783 let provider = mock_provider(vec![]);
1784 let channel = MockChannel::new(vec![]);
1785 let registry = create_test_registry();
1786 let executor = MockToolExecutor::no_tools();
1787
1788 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1789 let cid = memory.sqlite().create_conversation().await.unwrap();
1790
1791 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
1792 std::sync::Arc::new(memory),
1793 cid,
1794 50,
1795 5,
1796 100,
1797 );
1798
1799 let parts = vec![MessagePart::ToolResult {
1800 tool_use_id: "call_abc123".to_string(),
1801 content: "file contents here".to_string(),
1802 is_error: false,
1803 }];
1804 let content = "[tool_result: call_abc123]\nfile contents here";
1805
1806 agent
1807 .persist_message(Role::User, content, &parts, false)
1808 .await;
1809
1810 let history = agent
1811 .services
1812 .memory
1813 .persistence
1814 .memory
1815 .as_ref()
1816 .unwrap()
1817 .sqlite()
1818 .load_history(cid, 50)
1819 .await
1820 .unwrap();
1821
1822 assert_eq!(history.len(), 1);
1823 assert_eq!(history[0].role, Role::User);
1824 assert_eq!(history[0].content, content);
1825 assert_eq!(history[0].parts.len(), 1);
1826 match &history[0].parts[0] {
1827 MessagePart::ToolResult {
1828 tool_use_id,
1829 content: result_content,
1830 is_error,
1831 } => {
1832 assert_eq!(tool_use_id, "call_abc123");
1833 assert_eq!(result_content, "file contents here");
1834 assert!(!is_error);
1835 }
1836 other => panic!("expected ToolResult part, got {other:?}"),
1837 }
1838 assert!(
1840 !history[0]
1841 .parts
1842 .iter()
1843 .any(|p| matches!(p, MessagePart::ToolUse { .. })),
1844 "user ToolResult message must not contain ToolUse parts"
1845 );
1846 }
1847
1848 #[tokio::test]
1849 async fn persist_message_roundtrip_preserves_role_part_alignment() {
1850 use zeph_llm::provider::MessagePart;
1851
1852 let provider = mock_provider(vec![]);
1853 let channel = MockChannel::new(vec![]);
1854 let registry = create_test_registry();
1855 let executor = MockToolExecutor::no_tools();
1856
1857 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1858 let cid = memory.sqlite().create_conversation().await.unwrap();
1859
1860 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
1861 std::sync::Arc::new(memory),
1862 cid,
1863 50,
1864 5,
1865 100,
1866 );
1867
1868 let assistant_parts = vec![MessagePart::ToolUse {
1870 id: "id_1".to_string(),
1871 name: "list_dir".to_string(),
1872 input: serde_json::json!({"path": "/tmp"}),
1873 }];
1874 agent
1875 .persist_message(
1876 Role::Assistant,
1877 "[tool_use: list_dir(id_1)]",
1878 &assistant_parts,
1879 false,
1880 )
1881 .await;
1882
1883 let user_parts = vec![MessagePart::ToolResult {
1885 tool_use_id: "id_1".to_string(),
1886 content: "file1.txt\nfile2.txt".to_string(),
1887 is_error: false,
1888 }];
1889 agent
1890 .persist_message(
1891 Role::User,
1892 "[tool_result: id_1]\nfile1.txt\nfile2.txt",
1893 &user_parts,
1894 false,
1895 )
1896 .await;
1897
1898 let history = agent
1899 .services
1900 .memory
1901 .persistence
1902 .memory
1903 .as_ref()
1904 .unwrap()
1905 .sqlite()
1906 .load_history(cid, 50)
1907 .await
1908 .unwrap();
1909
1910 assert_eq!(history.len(), 2);
1911
1912 assert_eq!(history[0].role, Role::Assistant);
1914 assert_eq!(history[0].content, "[tool_use: list_dir(id_1)]");
1915 assert!(
1916 matches!(&history[0].parts[0], MessagePart::ToolUse { id, .. } if id == "id_1"),
1917 "first message must be assistant ToolUse"
1918 );
1919
1920 assert_eq!(history[1].role, Role::User);
1922 assert_eq!(
1923 history[1].content,
1924 "[tool_result: id_1]\nfile1.txt\nfile2.txt"
1925 );
1926 assert!(
1927 matches!(&history[1].parts[0], MessagePart::ToolResult { tool_use_id, .. } if tool_use_id == "id_1"),
1928 "second message must be user ToolResult"
1929 );
1930
1931 assert!(
1933 !history[0]
1934 .parts
1935 .iter()
1936 .any(|p| matches!(p, MessagePart::ToolResult { .. })),
1937 "assistant message must not have ToolResult parts"
1938 );
1939 assert!(
1940 !history[1]
1941 .parts
1942 .iter()
1943 .any(|p| matches!(p, MessagePart::ToolUse { .. })),
1944 "user message must not have ToolUse parts"
1945 );
1946 }
1947
1948 #[tokio::test]
1949 async fn persist_message_saves_correct_tool_output_parts() {
1950 use zeph_llm::provider::MessagePart;
1951
1952 let provider = mock_provider(vec![]);
1953 let channel = MockChannel::new(vec![]);
1954 let registry = create_test_registry();
1955 let executor = MockToolExecutor::no_tools();
1956
1957 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1958 let cid = memory.sqlite().create_conversation().await.unwrap();
1959
1960 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
1961 std::sync::Arc::new(memory),
1962 cid,
1963 50,
1964 5,
1965 100,
1966 );
1967
1968 let parts = vec![MessagePart::ToolOutput {
1969 tool_name: "shell".into(),
1970 body: "hello from shell".to_string(),
1971 compacted_at: None,
1972 }];
1973 let content = "[tool: shell]\nhello from shell";
1974
1975 agent
1976 .persist_message(Role::User, content, &parts, false)
1977 .await;
1978
1979 let history = agent
1980 .services
1981 .memory
1982 .persistence
1983 .memory
1984 .as_ref()
1985 .unwrap()
1986 .sqlite()
1987 .load_history(cid, 50)
1988 .await
1989 .unwrap();
1990
1991 assert_eq!(history.len(), 1);
1992 assert_eq!(history[0].role, Role::User);
1993 assert_eq!(history[0].content, content);
1994 assert_eq!(history[0].parts.len(), 1);
1995 match &history[0].parts[0] {
1996 MessagePart::ToolOutput {
1997 tool_name,
1998 body,
1999 compacted_at,
2000 } => {
2001 assert_eq!(tool_name, "shell");
2002 assert_eq!(body, "hello from shell");
2003 assert!(compacted_at.is_none());
2004 }
2005 other => panic!("expected ToolOutput part, got {other:?}"),
2006 }
2007 }
2008
2009 #[tokio::test]
2012 async fn load_history_removes_trailing_orphan_tool_use() {
2013 use zeph_llm::provider::MessagePart;
2014
2015 let provider = mock_provider(vec![]);
2016 let channel = MockChannel::new(vec![]);
2017 let registry = create_test_registry();
2018 let executor = MockToolExecutor::no_tools();
2019
2020 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2021 let cid = memory.sqlite().create_conversation().await.unwrap();
2022 let sqlite = memory.sqlite();
2023
2024 sqlite
2026 .save_message(cid, "user", "do something with a tool")
2027 .await
2028 .unwrap();
2029
2030 let parts = serde_json::to_string(&[MessagePart::ToolUse {
2032 id: "call_orphan".to_string(),
2033 name: "shell".to_string(),
2034 input: serde_json::json!({"command": "ls"}),
2035 }])
2036 .unwrap();
2037 sqlite
2038 .save_message_with_parts(cid, "assistant", "[tool_use: shell(call_orphan)]", &parts)
2039 .await
2040 .unwrap();
2041
2042 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2043 std::sync::Arc::new(memory),
2044 cid,
2045 50,
2046 5,
2047 100,
2048 );
2049
2050 let messages_before = agent.msg.messages.len();
2051 agent.load_history().await.unwrap();
2052
2053 assert_eq!(
2055 agent.msg.messages.len(),
2056 messages_before + 1,
2057 "orphaned trailing tool_use must be removed"
2058 );
2059 assert_eq!(agent.msg.messages.last().unwrap().role, Role::User);
2060 }
2061
2062 #[tokio::test]
2063 async fn load_history_removes_leading_orphan_tool_result() {
2064 use zeph_llm::provider::MessagePart;
2065
2066 let provider = mock_provider(vec![]);
2067 let channel = MockChannel::new(vec![]);
2068 let registry = create_test_registry();
2069 let executor = MockToolExecutor::no_tools();
2070
2071 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2072 let cid = memory.sqlite().create_conversation().await.unwrap();
2073 let sqlite = memory.sqlite();
2074
2075 let result_parts = serde_json::to_string(&[MessagePart::ToolResult {
2077 tool_use_id: "call_missing".to_string(),
2078 content: "result data".to_string(),
2079 is_error: false,
2080 }])
2081 .unwrap();
2082 sqlite
2083 .save_message_with_parts(
2084 cid,
2085 "user",
2086 "[tool_result: call_missing]\nresult data",
2087 &result_parts,
2088 )
2089 .await
2090 .unwrap();
2091
2092 sqlite
2094 .save_message(cid, "assistant", "here is my response")
2095 .await
2096 .unwrap();
2097
2098 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2099 std::sync::Arc::new(memory),
2100 cid,
2101 50,
2102 5,
2103 100,
2104 );
2105
2106 let messages_before = agent.msg.messages.len();
2107 agent.load_history().await.unwrap();
2108
2109 assert_eq!(
2111 agent.msg.messages.len(),
2112 messages_before + 1,
2113 "orphaned leading tool_result must be removed"
2114 );
2115 assert_eq!(agent.msg.messages.last().unwrap().role, Role::Assistant);
2116 }
2117
2118 #[tokio::test]
2119 async fn load_history_preserves_complete_tool_pairs() {
2120 use zeph_llm::provider::MessagePart;
2121
2122 let provider = mock_provider(vec![]);
2123 let channel = MockChannel::new(vec![]);
2124 let registry = create_test_registry();
2125 let executor = MockToolExecutor::no_tools();
2126
2127 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2128 let cid = memory.sqlite().create_conversation().await.unwrap();
2129 let sqlite = memory.sqlite();
2130
2131 let use_parts = serde_json::to_string(&[MessagePart::ToolUse {
2133 id: "call_ok".to_string(),
2134 name: "shell".to_string(),
2135 input: serde_json::json!({"command": "pwd"}),
2136 }])
2137 .unwrap();
2138 sqlite
2139 .save_message_with_parts(cid, "assistant", "[tool_use: shell(call_ok)]", &use_parts)
2140 .await
2141 .unwrap();
2142
2143 let result_parts = serde_json::to_string(&[MessagePart::ToolResult {
2144 tool_use_id: "call_ok".to_string(),
2145 content: "/home/user".to_string(),
2146 is_error: false,
2147 }])
2148 .unwrap();
2149 sqlite
2150 .save_message_with_parts(
2151 cid,
2152 "user",
2153 "[tool_result: call_ok]\n/home/user",
2154 &result_parts,
2155 )
2156 .await
2157 .unwrap();
2158
2159 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2160 std::sync::Arc::new(memory),
2161 cid,
2162 50,
2163 5,
2164 100,
2165 );
2166
2167 let messages_before = agent.msg.messages.len();
2168 agent.load_history().await.unwrap();
2169
2170 assert_eq!(
2172 agent.msg.messages.len(),
2173 messages_before + 2,
2174 "complete tool_use/tool_result pair must be preserved"
2175 );
2176 assert_eq!(agent.msg.messages[messages_before].role, Role::Assistant);
2177 assert_eq!(agent.msg.messages[messages_before + 1].role, Role::User);
2178 }
2179
2180 #[tokio::test]
2181 async fn load_history_handles_multiple_trailing_orphans() {
2182 use zeph_llm::provider::MessagePart;
2183
2184 let provider = mock_provider(vec![]);
2185 let channel = MockChannel::new(vec![]);
2186 let registry = create_test_registry();
2187 let executor = MockToolExecutor::no_tools();
2188
2189 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2190 let cid = memory.sqlite().create_conversation().await.unwrap();
2191 let sqlite = memory.sqlite();
2192
2193 sqlite.save_message(cid, "user", "start").await.unwrap();
2195
2196 let parts1 = serde_json::to_string(&[MessagePart::ToolUse {
2198 id: "call_1".to_string(),
2199 name: "shell".to_string(),
2200 input: serde_json::json!({}),
2201 }])
2202 .unwrap();
2203 sqlite
2204 .save_message_with_parts(cid, "assistant", "[tool_use: shell(call_1)]", &parts1)
2205 .await
2206 .unwrap();
2207
2208 let parts2 = serde_json::to_string(&[MessagePart::ToolUse {
2210 id: "call_2".to_string(),
2211 name: "read_file".to_string(),
2212 input: serde_json::json!({}),
2213 }])
2214 .unwrap();
2215 sqlite
2216 .save_message_with_parts(cid, "assistant", "[tool_use: read_file(call_2)]", &parts2)
2217 .await
2218 .unwrap();
2219
2220 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2221 std::sync::Arc::new(memory),
2222 cid,
2223 50,
2224 5,
2225 100,
2226 );
2227
2228 let messages_before = agent.msg.messages.len();
2229 agent.load_history().await.unwrap();
2230
2231 assert_eq!(
2233 agent.msg.messages.len(),
2234 messages_before + 1,
2235 "all trailing orphaned tool_use messages must be removed"
2236 );
2237 assert_eq!(agent.msg.messages.last().unwrap().role, Role::User);
2238 }
2239
2240 #[tokio::test]
2241 async fn load_history_no_tool_messages_unchanged() {
2242 let provider = mock_provider(vec![]);
2243 let channel = MockChannel::new(vec![]);
2244 let registry = create_test_registry();
2245 let executor = MockToolExecutor::no_tools();
2246
2247 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2248 let cid = memory.sqlite().create_conversation().await.unwrap();
2249 let sqlite = memory.sqlite();
2250
2251 sqlite.save_message(cid, "user", "hello").await.unwrap();
2252 sqlite
2253 .save_message(cid, "assistant", "hi there")
2254 .await
2255 .unwrap();
2256 sqlite
2257 .save_message(cid, "user", "how are you?")
2258 .await
2259 .unwrap();
2260
2261 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2262 std::sync::Arc::new(memory),
2263 cid,
2264 50,
2265 5,
2266 100,
2267 );
2268
2269 let messages_before = agent.msg.messages.len();
2270 agent.load_history().await.unwrap();
2271
2272 assert_eq!(
2274 agent.msg.messages.len(),
2275 messages_before + 3,
2276 "plain messages without tool parts must pass through unchanged"
2277 );
2278 }
2279
2280 #[tokio::test]
2281 async fn load_history_removes_both_leading_and_trailing_orphans() {
2282 use zeph_llm::provider::MessagePart;
2283
2284 let provider = mock_provider(vec![]);
2285 let channel = MockChannel::new(vec![]);
2286 let registry = create_test_registry();
2287 let executor = MockToolExecutor::no_tools();
2288
2289 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2290 let cid = memory.sqlite().create_conversation().await.unwrap();
2291 let sqlite = memory.sqlite();
2292
2293 let result_parts = serde_json::to_string(&[MessagePart::ToolResult {
2295 tool_use_id: "call_leading".to_string(),
2296 content: "orphaned result".to_string(),
2297 is_error: false,
2298 }])
2299 .unwrap();
2300 sqlite
2301 .save_message_with_parts(
2302 cid,
2303 "user",
2304 "[tool_result: call_leading]\norphaned result",
2305 &result_parts,
2306 )
2307 .await
2308 .unwrap();
2309
2310 sqlite
2312 .save_message(cid, "user", "what is 2+2?")
2313 .await
2314 .unwrap();
2315 sqlite.save_message(cid, "assistant", "4").await.unwrap();
2316
2317 let use_parts = serde_json::to_string(&[MessagePart::ToolUse {
2319 id: "call_trailing".to_string(),
2320 name: "shell".to_string(),
2321 input: serde_json::json!({"command": "date"}),
2322 }])
2323 .unwrap();
2324 sqlite
2325 .save_message_with_parts(
2326 cid,
2327 "assistant",
2328 "[tool_use: shell(call_trailing)]",
2329 &use_parts,
2330 )
2331 .await
2332 .unwrap();
2333
2334 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2335 std::sync::Arc::new(memory),
2336 cid,
2337 50,
2338 5,
2339 100,
2340 );
2341
2342 let messages_before = agent.msg.messages.len();
2343 agent.load_history().await.unwrap();
2344
2345 assert_eq!(
2347 agent.msg.messages.len(),
2348 messages_before + 2,
2349 "both leading and trailing orphans must be removed"
2350 );
2351 assert_eq!(agent.msg.messages[messages_before].role, Role::User);
2352 assert_eq!(agent.msg.messages[messages_before].content, "what is 2+2?");
2353 assert_eq!(
2354 agent.msg.messages[messages_before + 1].role,
2355 Role::Assistant
2356 );
2357 assert_eq!(agent.msg.messages[messages_before + 1].content, "4");
2358 }
2359
2360 #[tokio::test]
2365 async fn sanitize_tool_pairs_strips_mid_history_orphan_tool_use() {
2366 use zeph_llm::provider::MessagePart;
2367
2368 let provider = mock_provider(vec![]);
2369 let channel = MockChannel::new(vec![]);
2370 let registry = create_test_registry();
2371 let executor = MockToolExecutor::no_tools();
2372
2373 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2374 let cid = memory.sqlite().create_conversation().await.unwrap();
2375 let sqlite = memory.sqlite();
2376
2377 sqlite
2379 .save_message(cid, "user", "first question")
2380 .await
2381 .unwrap();
2382 sqlite
2383 .save_message(cid, "assistant", "first answer")
2384 .await
2385 .unwrap();
2386
2387 let use_parts = serde_json::to_string(&[
2391 MessagePart::ToolUse {
2392 id: "call_mid_1".to_string(),
2393 name: "shell".to_string(),
2394 input: serde_json::json!({"command": "ls"}),
2395 },
2396 MessagePart::Text {
2397 text: "Let me check the files.".to_string(),
2398 },
2399 ])
2400 .unwrap();
2401 sqlite
2402 .save_message_with_parts(cid, "assistant", "Let me check the files.", &use_parts)
2403 .await
2404 .unwrap();
2405
2406 sqlite
2408 .save_message(cid, "user", "second question")
2409 .await
2410 .unwrap();
2411 sqlite
2412 .save_message(cid, "assistant", "second answer")
2413 .await
2414 .unwrap();
2415
2416 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2417 std::sync::Arc::new(memory),
2418 cid,
2419 50,
2420 5,
2421 100,
2422 );
2423
2424 let messages_before = agent.msg.messages.len();
2425 agent.load_history().await.unwrap();
2426
2427 assert_eq!(
2430 agent.msg.messages.len(),
2431 messages_before + 5,
2432 "message count must be 5 (orphan message kept — has text content)"
2433 );
2434
2435 let orphan = &agent.msg.messages[messages_before + 2];
2437 assert_eq!(orphan.role, Role::Assistant);
2438 assert!(
2439 !orphan
2440 .parts
2441 .iter()
2442 .any(|p| matches!(p, MessagePart::ToolUse { .. })),
2443 "orphaned ToolUse parts must be stripped from mid-history message"
2444 );
2445 assert!(
2447 orphan.parts.iter().any(
2448 |p| matches!(p, MessagePart::Text { text } if text == "Let me check the files.")
2449 ),
2450 "text content of orphaned assistant message must be preserved"
2451 );
2452 }
2453
2454 #[tokio::test]
2459 async fn load_history_keeps_tool_only_user_message() {
2460 use zeph_llm::provider::MessagePart;
2461
2462 let provider = mock_provider(vec![]);
2463 let channel = MockChannel::new(vec![]);
2464 let registry = create_test_registry();
2465 let executor = MockToolExecutor::no_tools();
2466
2467 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2468 let cid = memory.sqlite().create_conversation().await.unwrap();
2469 let sqlite = memory.sqlite();
2470
2471 let use_parts = serde_json::to_string(&[MessagePart::ToolUse {
2473 id: "call_rc3".to_string(),
2474 name: "memory_save".to_string(),
2475 input: serde_json::json!({"content": "something"}),
2476 }])
2477 .unwrap();
2478 sqlite
2479 .save_message_with_parts(cid, "assistant", "[tool_use: memory_save]", &use_parts)
2480 .await
2481 .unwrap();
2482
2483 let result_parts = serde_json::to_string(&[MessagePart::ToolResult {
2485 tool_use_id: "call_rc3".to_string(),
2486 content: "saved".to_string(),
2487 is_error: false,
2488 }])
2489 .unwrap();
2490 sqlite
2491 .save_message_with_parts(cid, "user", "", &result_parts)
2492 .await
2493 .unwrap();
2494
2495 sqlite.save_message(cid, "assistant", "done").await.unwrap();
2496
2497 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2498 std::sync::Arc::new(memory),
2499 cid,
2500 50,
2501 5,
2502 100,
2503 );
2504
2505 let messages_before = agent.msg.messages.len();
2506 agent.load_history().await.unwrap();
2507
2508 assert_eq!(
2511 agent.msg.messages.len(),
2512 messages_before + 3,
2513 "user message with empty content but ToolResult parts must not be dropped"
2514 );
2515
2516 let user_msg = &agent.msg.messages[messages_before + 1];
2518 assert_eq!(user_msg.role, Role::User);
2519 assert!(
2520 user_msg.parts.iter().any(
2521 |p| matches!(p, MessagePart::ToolResult { tool_use_id, .. } if tool_use_id == "call_rc3")
2522 ),
2523 "ToolResult part must be preserved on user message with empty content"
2524 );
2525 }
2526
2527 #[tokio::test]
2531 async fn strip_orphans_removes_orphaned_tool_result() {
2532 use zeph_llm::provider::MessagePart;
2533
2534 let provider = mock_provider(vec![]);
2535 let channel = MockChannel::new(vec![]);
2536 let registry = create_test_registry();
2537 let executor = MockToolExecutor::no_tools();
2538
2539 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2540 let cid = memory.sqlite().create_conversation().await.unwrap();
2541 let sqlite = memory.sqlite();
2542
2543 sqlite.save_message(cid, "user", "hello").await.unwrap();
2545 sqlite.save_message(cid, "assistant", "hi").await.unwrap();
2546
2547 sqlite
2549 .save_message(cid, "assistant", "plain answer")
2550 .await
2551 .unwrap();
2552
2553 let orphan_result_parts = serde_json::to_string(&[MessagePart::ToolResult {
2555 tool_use_id: "call_nonexistent".to_string(),
2556 content: "stale result".to_string(),
2557 is_error: false,
2558 }])
2559 .unwrap();
2560 sqlite
2561 .save_message_with_parts(
2562 cid,
2563 "user",
2564 "[tool_result: call_nonexistent]\nstale result",
2565 &orphan_result_parts,
2566 )
2567 .await
2568 .unwrap();
2569
2570 sqlite
2571 .save_message(cid, "assistant", "final")
2572 .await
2573 .unwrap();
2574
2575 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2576 std::sync::Arc::new(memory),
2577 cid,
2578 50,
2579 5,
2580 100,
2581 );
2582
2583 let messages_before = agent.msg.messages.len();
2584 agent.load_history().await.unwrap();
2585
2586 let loaded = &agent.msg.messages[messages_before..];
2590 for msg in loaded {
2591 assert!(
2592 !msg.parts.iter().any(|p| matches!(
2593 p,
2594 MessagePart::ToolResult { tool_use_id, .. } if tool_use_id == "call_nonexistent"
2595 )),
2596 "orphaned ToolResult part must be stripped from history"
2597 );
2598 }
2599 }
2600
2601 #[tokio::test]
2604 async fn strip_orphans_keeps_complete_pair() {
2605 use zeph_llm::provider::MessagePart;
2606
2607 let provider = mock_provider(vec![]);
2608 let channel = MockChannel::new(vec![]);
2609 let registry = create_test_registry();
2610 let executor = MockToolExecutor::no_tools();
2611
2612 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2613 let cid = memory.sqlite().create_conversation().await.unwrap();
2614 let sqlite = memory.sqlite();
2615
2616 let use_parts = serde_json::to_string(&[MessagePart::ToolUse {
2617 id: "call_valid".to_string(),
2618 name: "shell".to_string(),
2619 input: serde_json::json!({"command": "ls"}),
2620 }])
2621 .unwrap();
2622 sqlite
2623 .save_message_with_parts(cid, "assistant", "[tool_use: shell]", &use_parts)
2624 .await
2625 .unwrap();
2626
2627 let result_parts = serde_json::to_string(&[MessagePart::ToolResult {
2628 tool_use_id: "call_valid".to_string(),
2629 content: "file.rs".to_string(),
2630 is_error: false,
2631 }])
2632 .unwrap();
2633 sqlite
2634 .save_message_with_parts(cid, "user", "", &result_parts)
2635 .await
2636 .unwrap();
2637
2638 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2639 std::sync::Arc::new(memory),
2640 cid,
2641 50,
2642 5,
2643 100,
2644 );
2645
2646 let messages_before = agent.msg.messages.len();
2647 agent.load_history().await.unwrap();
2648
2649 assert_eq!(
2650 agent.msg.messages.len(),
2651 messages_before + 2,
2652 "complete tool_use/tool_result pair must be preserved"
2653 );
2654
2655 let user_msg = &agent.msg.messages[messages_before + 1];
2656 assert!(
2657 user_msg.parts.iter().any(|p| matches!(
2658 p,
2659 MessagePart::ToolResult { tool_use_id, .. } if tool_use_id == "call_valid"
2660 )),
2661 "ToolResult part for a matched tool_use must not be stripped"
2662 );
2663 }
2664
2665 #[tokio::test]
2668 async fn strip_orphans_mixed_history() {
2669 use zeph_llm::provider::MessagePart;
2670
2671 let provider = mock_provider(vec![]);
2672 let channel = MockChannel::new(vec![]);
2673 let registry = create_test_registry();
2674 let executor = MockToolExecutor::no_tools();
2675
2676 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2677 let cid = memory.sqlite().create_conversation().await.unwrap();
2678 let sqlite = memory.sqlite();
2679
2680 let use_parts_ok = serde_json::to_string(&[MessagePart::ToolUse {
2682 id: "call_good".to_string(),
2683 name: "shell".to_string(),
2684 input: serde_json::json!({"command": "pwd"}),
2685 }])
2686 .unwrap();
2687 sqlite
2688 .save_message_with_parts(cid, "assistant", "[tool_use: shell]", &use_parts_ok)
2689 .await
2690 .unwrap();
2691
2692 let result_parts_ok = serde_json::to_string(&[MessagePart::ToolResult {
2693 tool_use_id: "call_good".to_string(),
2694 content: "/home".to_string(),
2695 is_error: false,
2696 }])
2697 .unwrap();
2698 sqlite
2699 .save_message_with_parts(cid, "user", "", &result_parts_ok)
2700 .await
2701 .unwrap();
2702
2703 sqlite
2705 .save_message(cid, "assistant", "text only")
2706 .await
2707 .unwrap();
2708
2709 let orphan_parts = serde_json::to_string(&[MessagePart::ToolResult {
2710 tool_use_id: "call_ghost".to_string(),
2711 content: "ghost result".to_string(),
2712 is_error: false,
2713 }])
2714 .unwrap();
2715 sqlite
2716 .save_message_with_parts(
2717 cid,
2718 "user",
2719 "[tool_result: call_ghost]\nghost result",
2720 &orphan_parts,
2721 )
2722 .await
2723 .unwrap();
2724
2725 sqlite
2726 .save_message(cid, "assistant", "final reply")
2727 .await
2728 .unwrap();
2729
2730 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2731 std::sync::Arc::new(memory),
2732 cid,
2733 50,
2734 5,
2735 100,
2736 );
2737
2738 let messages_before = agent.msg.messages.len();
2739 agent.load_history().await.unwrap();
2740
2741 let loaded = &agent.msg.messages[messages_before..];
2742
2743 for msg in loaded {
2745 assert!(
2746 !msg.parts.iter().any(|p| matches!(
2747 p,
2748 MessagePart::ToolResult { tool_use_id, .. } if tool_use_id == "call_ghost"
2749 )),
2750 "orphaned ToolResult (call_ghost) must be stripped from history"
2751 );
2752 }
2753
2754 let has_good_result = loaded.iter().any(|msg| {
2757 msg.role == Role::User
2758 && msg.parts.iter().any(|p| {
2759 matches!(
2760 p,
2761 MessagePart::ToolResult { tool_use_id, .. } if tool_use_id == "call_good"
2762 )
2763 })
2764 });
2765 assert!(
2766 has_good_result,
2767 "matched ToolResult (call_good) must be preserved in history"
2768 );
2769 }
2770
2771 #[tokio::test]
2774 async fn sanitize_tool_pairs_preserves_matched_tool_pair() {
2775 use zeph_llm::provider::MessagePart;
2776
2777 let provider = mock_provider(vec![]);
2778 let channel = MockChannel::new(vec![]);
2779 let registry = create_test_registry();
2780 let executor = MockToolExecutor::no_tools();
2781
2782 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2783 let cid = memory.sqlite().create_conversation().await.unwrap();
2784 let sqlite = memory.sqlite();
2785
2786 sqlite
2787 .save_message(cid, "user", "run a command")
2788 .await
2789 .unwrap();
2790
2791 let use_parts = serde_json::to_string(&[MessagePart::ToolUse {
2793 id: "call_ok".to_string(),
2794 name: "shell".to_string(),
2795 input: serde_json::json!({"command": "echo hi"}),
2796 }])
2797 .unwrap();
2798 sqlite
2799 .save_message_with_parts(cid, "assistant", "[tool_use: shell]", &use_parts)
2800 .await
2801 .unwrap();
2802
2803 let result_parts = serde_json::to_string(&[MessagePart::ToolResult {
2805 tool_use_id: "call_ok".to_string(),
2806 content: "hi".to_string(),
2807 is_error: false,
2808 }])
2809 .unwrap();
2810 sqlite
2811 .save_message_with_parts(cid, "user", "[tool_result: call_ok]\nhi", &result_parts)
2812 .await
2813 .unwrap();
2814
2815 sqlite.save_message(cid, "assistant", "done").await.unwrap();
2816
2817 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2818 std::sync::Arc::new(memory),
2819 cid,
2820 50,
2821 5,
2822 100,
2823 );
2824
2825 let messages_before = agent.msg.messages.len();
2826 agent.load_history().await.unwrap();
2827
2828 assert_eq!(
2830 agent.msg.messages.len(),
2831 messages_before + 4,
2832 "matched tool pair must not be removed"
2833 );
2834 let tool_msg = &agent.msg.messages[messages_before + 1];
2835 assert!(
2836 tool_msg
2837 .parts
2838 .iter()
2839 .any(|p| matches!(p, MessagePart::ToolUse { id, .. } if id == "call_ok")),
2840 "matched ToolUse parts must be preserved"
2841 );
2842 }
2843
2844 #[tokio::test]
2848 async fn persist_cancelled_tool_results_pairs_tool_use() {
2849 use zeph_llm::provider::MessagePart;
2850
2851 let provider = mock_provider(vec![]);
2852 let channel = MockChannel::new(vec![]);
2853 let registry = create_test_registry();
2854 let executor = MockToolExecutor::no_tools();
2855
2856 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2857 let cid = memory.sqlite().create_conversation().await.unwrap();
2858
2859 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2860 std::sync::Arc::new(memory),
2861 cid,
2862 50,
2863 5,
2864 100,
2865 );
2866
2867 let tool_calls = vec![
2869 zeph_llm::provider::ToolUseRequest {
2870 id: "cancel_id_1".to_string(),
2871 name: "shell".to_string().into(),
2872 input: serde_json::json!({}),
2873 },
2874 zeph_llm::provider::ToolUseRequest {
2875 id: "cancel_id_2".to_string(),
2876 name: "read_file".to_string().into(),
2877 input: serde_json::json!({}),
2878 },
2879 ];
2880
2881 agent.persist_cancelled_tool_results(&tool_calls).await;
2882
2883 let history = agent
2884 .services
2885 .memory
2886 .persistence
2887 .memory
2888 .as_ref()
2889 .unwrap()
2890 .sqlite()
2891 .load_history(cid, 50)
2892 .await
2893 .unwrap();
2894
2895 assert_eq!(history.len(), 1);
2897 assert_eq!(history[0].role, Role::User);
2898
2899 for tc in &tool_calls {
2901 assert!(
2902 history[0].parts.iter().any(|p| matches!(
2903 p,
2904 MessagePart::ToolResult { tool_use_id, is_error, .. }
2905 if tool_use_id == &tc.id && *is_error
2906 )),
2907 "tombstone ToolResult for {} must be present and is_error=true",
2908 tc.id
2909 );
2910 }
2911 }
2912
2913 #[tokio::test]
2924 async fn issue_2529_orphaned_legacy_content_pair_is_soft_deleted() {
2925 use zeph_llm::provider::MessagePart;
2926
2927 let provider = mock_provider(vec![]);
2928 let channel = MockChannel::new(vec![]);
2929 let registry = create_test_registry();
2930 let executor = MockToolExecutor::no_tools();
2931
2932 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2933 let cid = memory.sqlite().create_conversation().await.unwrap();
2934 let sqlite = memory.sqlite();
2935
2936 sqlite
2938 .save_message(cid, "user", "save this for me")
2939 .await
2940 .unwrap();
2941
2942 let use_parts = serde_json::to_string(&[MessagePart::ToolUse {
2945 id: "call_2529".to_string(),
2946 name: "memory_save".to_string(),
2947 input: serde_json::json!({"content": "save this"}),
2948 }])
2949 .unwrap();
2950 let orphan_assistant_id = sqlite
2951 .save_message_with_parts(
2952 cid,
2953 "assistant",
2954 "[tool_use: memory_save(call_2529)]",
2955 &use_parts,
2956 )
2957 .await
2958 .unwrap();
2959
2960 sqlite
2965 .save_message(cid, "assistant", "here is a plain reply")
2966 .await
2967 .unwrap();
2968
2969 let result_parts = serde_json::to_string(&[MessagePart::ToolResult {
2970 tool_use_id: "call_2529".to_string(),
2971 content: "saved".to_string(),
2972 is_error: false,
2973 }])
2974 .unwrap();
2975 let orphan_user_id = sqlite
2976 .save_message_with_parts(
2977 cid,
2978 "user",
2979 "[tool_result: call_2529]\nsaved",
2980 &result_parts,
2981 )
2982 .await
2983 .unwrap();
2984
2985 sqlite.save_message(cid, "assistant", "done").await.unwrap();
2987
2988 let memory_arc = std::sync::Arc::new(memory);
2989 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2990 memory_arc.clone(),
2991 cid,
2992 50,
2993 5,
2994 100,
2995 );
2996
2997 agent.load_history().await.unwrap();
2998
2999 let assistant_deleted_count: Vec<i64> = zeph_db::query_scalar(
3002 "SELECT COUNT(*) FROM messages WHERE id = ? AND deleted_at IS NOT NULL",
3003 )
3004 .bind(orphan_assistant_id)
3005 .fetch_all(memory_arc.sqlite().pool())
3006 .await
3007 .unwrap();
3008
3009 let user_deleted_count: Vec<i64> = zeph_db::query_scalar(
3010 "SELECT COUNT(*) FROM messages WHERE id = ? AND deleted_at IS NOT NULL",
3011 )
3012 .bind(orphan_user_id)
3013 .fetch_all(memory_arc.sqlite().pool())
3014 .await
3015 .unwrap();
3016
3017 assert_eq!(
3018 assistant_deleted_count.first().copied().unwrap_or(0),
3019 1,
3020 "orphaned assistant[ToolUse] with legacy-only content must be soft-deleted (deleted_at IS NOT NULL)"
3021 );
3022 assert_eq!(
3023 user_deleted_count.first().copied().unwrap_or(0),
3024 1,
3025 "orphaned user[ToolResult] with legacy-only content must be soft-deleted (deleted_at IS NOT NULL)"
3026 );
3027 }
3028
3029 #[tokio::test]
3033 async fn issue_2529_soft_delete_is_idempotent_across_sessions() {
3034 use zeph_llm::provider::MessagePart;
3035
3036 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
3037 let cid = memory.sqlite().create_conversation().await.unwrap();
3038 let sqlite = memory.sqlite();
3039
3040 sqlite
3042 .save_message(cid, "user", "do something")
3043 .await
3044 .unwrap();
3045
3046 let use_parts = serde_json::to_string(&[MessagePart::ToolUse {
3048 id: "call_idem".to_string(),
3049 name: "shell".to_string(),
3050 input: serde_json::json!({"command": "ls"}),
3051 }])
3052 .unwrap();
3053 sqlite
3054 .save_message_with_parts(cid, "assistant", "[tool_use: shell(call_idem)]", &use_parts)
3055 .await
3056 .unwrap();
3057
3058 sqlite
3060 .save_message(cid, "assistant", "continuing")
3061 .await
3062 .unwrap();
3063
3064 let result_parts = serde_json::to_string(&[MessagePart::ToolResult {
3066 tool_use_id: "call_idem".to_string(),
3067 content: "output".to_string(),
3068 is_error: false,
3069 }])
3070 .unwrap();
3071 sqlite
3072 .save_message_with_parts(
3073 cid,
3074 "user",
3075 "[tool_result: call_idem]\noutput",
3076 &result_parts,
3077 )
3078 .await
3079 .unwrap();
3080
3081 sqlite
3082 .save_message(cid, "assistant", "final")
3083 .await
3084 .unwrap();
3085
3086 let memory_arc = std::sync::Arc::new(memory);
3087
3088 let mut agent1 = Agent::new(
3090 mock_provider(vec![]),
3091 MockChannel::new(vec![]),
3092 create_test_registry(),
3093 None,
3094 5,
3095 MockToolExecutor::no_tools(),
3096 )
3097 .with_memory(memory_arc.clone(), cid, 50, 5, 100);
3098 agent1.load_history().await.unwrap();
3099 let count_after_first = agent1.msg.messages.len();
3100
3101 let mut agent2 = Agent::new(
3104 mock_provider(vec![]),
3105 MockChannel::new(vec![]),
3106 create_test_registry(),
3107 None,
3108 5,
3109 MockToolExecutor::no_tools(),
3110 )
3111 .with_memory(memory_arc.clone(), cid, 50, 5, 100);
3112 agent2.load_history().await.unwrap();
3113 let count_after_second = agent2.msg.messages.len();
3114
3115 assert_eq!(
3117 count_after_first, count_after_second,
3118 "second load_history must load the same message count as the first (soft-deleted orphans excluded)"
3119 );
3120 }
3121
3122 #[tokio::test]
3126 async fn issue_2529_message_with_text_and_tool_tag_is_kept_after_part_strip() {
3127 use zeph_llm::provider::MessagePart;
3128
3129 let provider = mock_provider(vec![]);
3130 let channel = MockChannel::new(vec![]);
3131 let registry = create_test_registry();
3132 let executor = MockToolExecutor::no_tools();
3133
3134 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
3135 let cid = memory.sqlite().create_conversation().await.unwrap();
3136 let sqlite = memory.sqlite();
3137
3138 sqlite
3140 .save_message(cid, "user", "check the files")
3141 .await
3142 .unwrap();
3143
3144 let use_parts = serde_json::to_string(&[MessagePart::ToolUse {
3147 id: "call_mixed".to_string(),
3148 name: "shell".to_string(),
3149 input: serde_json::json!({"command": "ls"}),
3150 }])
3151 .unwrap();
3152 sqlite
3153 .save_message_with_parts(
3154 cid,
3155 "assistant",
3156 "Let me list the directory. [tool_use: shell(call_mixed)]",
3157 &use_parts,
3158 )
3159 .await
3160 .unwrap();
3161
3162 sqlite.save_message(cid, "user", "thanks").await.unwrap();
3164 sqlite
3165 .save_message(cid, "assistant", "you are welcome")
3166 .await
3167 .unwrap();
3168
3169 let memory_arc = std::sync::Arc::new(memory);
3170 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
3171 memory_arc.clone(),
3172 cid,
3173 50,
3174 5,
3175 100,
3176 );
3177
3178 let messages_before = agent.msg.messages.len();
3179 agent.load_history().await.unwrap();
3180
3181 assert_eq!(
3183 agent.msg.messages.len(),
3184 messages_before + 4,
3185 "assistant message with text + tool tag must not be removed after ToolUse strip"
3186 );
3187
3188 let mixed_msg = agent
3190 .msg
3191 .messages
3192 .iter()
3193 .find(|m| m.content.contains("Let me list the directory"))
3194 .expect("mixed-content assistant message must still be in history");
3195 assert!(
3196 !mixed_msg
3197 .parts
3198 .iter()
3199 .any(|p| matches!(p, MessagePart::ToolUse { .. })),
3200 "orphaned ToolUse parts must be stripped even when message has meaningful text"
3201 );
3202 assert_eq!(
3203 mixed_msg.content, "Let me list the directory. [tool_use: shell(call_mixed)]",
3204 "content field must be unchanged — only parts are stripped"
3205 );
3206 }
3207
3208 #[tokio::test]
3211 async fn persist_message_skipped_tool_result_does_not_embed() {
3212 use zeph_llm::provider::MessagePart;
3213
3214 let provider = mock_provider(vec![]);
3215 let channel = MockChannel::new(vec![]);
3216 let registry = create_test_registry();
3217 let executor = MockToolExecutor::no_tools();
3218
3219 let (tx, rx) = tokio::sync::watch::channel(MetricsSnapshot::default());
3220 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
3221 let cid = memory.sqlite().create_conversation().await.unwrap();
3222
3223 let mut agent = Agent::new(provider, channel, registry, None, 5, executor)
3224 .with_metrics(tx)
3225 .with_memory(std::sync::Arc::new(memory), cid, 50, 5, 100);
3226 agent.services.memory.persistence.autosave_assistant = true;
3227 agent.services.memory.persistence.autosave_min_length = 0;
3228
3229 let parts = vec![MessagePart::ToolResult {
3230 tool_use_id: "tu1".into(),
3231 content: "[skipped] bash tool was blocked by utility gate".into(),
3232 is_error: false,
3233 }];
3234
3235 agent
3236 .persist_message(
3237 Role::User,
3238 "[skipped] bash tool was blocked by utility gate",
3239 &parts,
3240 false,
3241 )
3242 .await;
3243
3244 assert_eq!(
3245 rx.borrow().embeddings_generated,
3246 0,
3247 "[skipped] ToolResult must not be embedded into Qdrant"
3248 );
3249 }
3250
3251 #[tokio::test]
3252 async fn persist_message_stopped_tool_result_does_not_embed() {
3253 use zeph_llm::provider::MessagePart;
3254
3255 let provider = mock_provider(vec![]);
3256 let channel = MockChannel::new(vec![]);
3257 let registry = create_test_registry();
3258 let executor = MockToolExecutor::no_tools();
3259
3260 let (tx, rx) = tokio::sync::watch::channel(MetricsSnapshot::default());
3261 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
3262 let cid = memory.sqlite().create_conversation().await.unwrap();
3263
3264 let mut agent = Agent::new(provider, channel, registry, None, 5, executor)
3265 .with_metrics(tx)
3266 .with_memory(std::sync::Arc::new(memory), cid, 50, 5, 100);
3267 agent.services.memory.persistence.autosave_assistant = true;
3268 agent.services.memory.persistence.autosave_min_length = 0;
3269
3270 let parts = vec![MessagePart::ToolResult {
3271 tool_use_id: "tu2".into(),
3272 content: "[stopped] execution limit reached".into(),
3273 is_error: false,
3274 }];
3275
3276 agent
3277 .persist_message(
3278 Role::User,
3279 "[stopped] execution limit reached",
3280 &parts,
3281 false,
3282 )
3283 .await;
3284
3285 assert_eq!(
3286 rx.borrow().embeddings_generated,
3287 0,
3288 "[stopped] ToolResult must not be embedded into Qdrant"
3289 );
3290 }
3291
3292 #[tokio::test]
3293 async fn persist_message_normal_tool_result_is_saved_not_blocked_by_guard() {
3294 use zeph_llm::provider::MessagePart;
3297
3298 let provider = mock_provider(vec![]);
3299 let channel = MockChannel::new(vec![]);
3300 let registry = create_test_registry();
3301 let executor = MockToolExecutor::no_tools();
3302
3303 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
3304 let cid = memory.sqlite().create_conversation().await.unwrap();
3305 let memory_arc = std::sync::Arc::new(memory);
3306
3307 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
3308 memory_arc.clone(),
3309 cid,
3310 50,
3311 5,
3312 100,
3313 );
3314 agent.services.memory.persistence.autosave_assistant = true;
3315 agent.services.memory.persistence.autosave_min_length = 0;
3316
3317 let content = "total 42\ndrwxr-xr-x 5 user group";
3318 let parts = vec![MessagePart::ToolResult {
3319 tool_use_id: "tu3".into(),
3320 content: content.into(),
3321 is_error: false,
3322 }];
3323
3324 agent
3325 .persist_message(Role::User, content, &parts, false)
3326 .await;
3327
3328 let history = memory_arc.sqlite().load_history(cid, 50).await.unwrap();
3330 assert_eq!(
3331 history.len(),
3332 1,
3333 "normal ToolResult must be saved to SQLite"
3334 );
3335 assert_eq!(history[0].content, content);
3336 }
3337
3338 #[test]
3343 fn trajectory_extraction_slice_bounds_messages() {
3344 let max_messages: usize = 20;
3346 let total_messages = 100usize;
3347
3348 let tail_start = total_messages.saturating_sub(max_messages);
3349 let window = total_messages - tail_start;
3350
3351 assert_eq!(
3352 window, 20,
3353 "slice should contain exactly max_messages items"
3354 );
3355 assert_eq!(tail_start, 80, "slice should start at len - max_messages");
3356 }
3357
3358 #[test]
3359 fn trajectory_extraction_slice_handles_few_messages() {
3360 let max_messages: usize = 20;
3361 let total_messages = 5usize;
3362
3363 let tail_start = total_messages.saturating_sub(max_messages);
3364 let window = total_messages - tail_start;
3365
3366 assert_eq!(window, 5, "should return all messages when fewer than max");
3367 assert_eq!(tail_start, 0, "slice should start from the beginning");
3368 }
3369
3370 #[tokio::test]
3376 async fn regression_3168_complete_tool_pair_survives_round_trip() {
3377 use zeph_llm::provider::MessagePart;
3378
3379 let provider = mock_provider(vec![]);
3380 let channel = MockChannel::new(vec![]);
3381 let registry = create_test_registry();
3382 let executor = MockToolExecutor::no_tools();
3383
3384 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
3385 let cid = memory.sqlite().create_conversation().await.unwrap();
3386 let sqlite = memory.sqlite();
3387
3388 let use_parts = serde_json::to_string(&[MessagePart::ToolUse {
3389 id: "r3168_call".to_string(),
3390 name: "shell".to_string(),
3391 input: serde_json::json!({"command": "echo hi"}),
3392 }])
3393 .unwrap();
3394 sqlite
3395 .save_message_with_parts(
3396 cid,
3397 "assistant",
3398 "[tool_use: shell(r3168_call)]",
3399 &use_parts,
3400 )
3401 .await
3402 .unwrap();
3403
3404 let result_parts = serde_json::to_string(&[MessagePart::ToolResult {
3405 tool_use_id: "r3168_call".to_string(),
3406 content: "[skipped]".to_string(),
3407 is_error: false,
3408 }])
3409 .unwrap();
3410 sqlite
3411 .save_message_with_parts(cid, "user", "[tool_result: r3168_call]", &result_parts)
3412 .await
3413 .unwrap();
3414
3415 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
3416 std::sync::Arc::new(memory),
3417 cid,
3418 50,
3419 5,
3420 100,
3421 );
3422
3423 let base = agent.msg.messages.len();
3424 agent.load_history().await.unwrap();
3425
3426 assert_eq!(
3427 agent.msg.messages.len(),
3428 base + 2,
3429 "both messages of the complete pair must survive load_history"
3430 );
3431
3432 let assistant_msg = agent
3433 .msg
3434 .messages
3435 .iter()
3436 .find(|m| m.role == Role::Assistant)
3437 .expect("assistant message missing after load_history");
3438 assert!(
3439 assistant_msg
3440 .parts
3441 .iter()
3442 .any(|p| matches!(p, MessagePart::ToolUse { id, .. } if id == "r3168_call")),
3443 "ToolUse part must be preserved in assistant message"
3444 );
3445
3446 let user_msg = agent
3447 .msg
3448 .messages
3449 .iter()
3450 .rev()
3451 .find(|m| m.role == Role::User)
3452 .expect("user message missing after load_history");
3453 assert!(
3454 user_msg.parts.iter().any(|p| matches!(p, MessagePart::ToolResult { tool_use_id, .. } if tool_use_id == "r3168_call")),
3455 "ToolResult part must be preserved in user message"
3456 );
3457 }
3458
3459 #[tokio::test]
3464 async fn regression_3168_corrupt_parts_row_skipped_on_load() {
3465 use zeph_llm::provider::MessagePart;
3466
3467 let provider = mock_provider(vec![]);
3468 let channel = MockChannel::new(vec![]);
3469 let registry = create_test_registry();
3470 let executor = MockToolExecutor::no_tools();
3471
3472 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
3473 let cid = memory.sqlite().create_conversation().await.unwrap();
3474 let sqlite = memory.sqlite();
3475
3476 sqlite
3480 .save_message_with_parts(cid, "assistant", "[tool_use: shell(corrupt)]", "[]")
3481 .await
3482 .unwrap();
3483
3484 let result_parts = serde_json::to_string(&[MessagePart::ToolResult {
3486 tool_use_id: "corrupt".to_string(),
3487 content: "result".to_string(),
3488 is_error: false,
3489 }])
3490 .unwrap();
3491 sqlite
3492 .save_message_with_parts(cid, "user", "[tool_result: corrupt]", &result_parts)
3493 .await
3494 .unwrap();
3495
3496 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
3497 std::sync::Arc::new(memory),
3498 cid,
3499 50,
3500 5,
3501 100,
3502 );
3503
3504 let base = agent.msg.messages.len();
3505 agent.load_history().await.unwrap();
3506
3507 let loaded = agent.msg.messages.len() - base;
3512 let orphan_present = agent.msg.messages.iter().any(|m| {
3515 m.role == Role::User
3516 && m.parts.iter().any(|p| {
3517 matches!(p, MessagePart::ToolResult { tool_use_id, .. } if tool_use_id == "corrupt")
3518 })
3519 });
3520 assert!(
3521 !orphan_present,
3522 "orphaned ToolResult must not survive load_history; loaded={loaded}"
3523 );
3524 }
3525}