1use crate::channel::Channel;
5use zeph_agent_persistence::graph::{build_graph_extraction_config, collect_context_messages};
6use zeph_agent_persistence::{
7 LoadHistoryParams, MemoryPersistenceView, MetricsView, PersistMessageRequest,
8 PersistenceService, SecurityView,
9};
10use zeph_llm::provider::{LlmProvider as _, MessagePart, Role};
11
12use super::Agent;
13
14impl<C: Channel> Agent<C> {
15 #[tracing::instrument(name = "core.persist.load_history", skip_all, level = "debug", err)]
31 pub async fn load_history(&mut self) -> Result<(), super::error::AgentError> {
32 let (Some(memory), Some(cid)) = (
33 self.services.memory.persistence.memory.as_ref(),
34 self.services.memory.persistence.conversation_id,
35 ) else {
36 return Ok(());
37 };
38
39 let memory = memory.clone();
41
42 let mut unsummarized = self.services.memory.persistence.unsummarized_count;
43 let memory_view = MemoryPersistenceView {
46 memory: Some(&memory),
47 conversation_id: self.services.memory.persistence.conversation_id,
48 autosave_assistant: self.services.memory.persistence.autosave_assistant,
49 autosave_min_length: self.services.memory.persistence.autosave_min_length,
50 unsummarized_count: &mut unsummarized,
51 goal_text: self.services.memory.extraction.goal_text.clone(),
52 };
53
54 let svc = PersistenceService::new();
55 let outcome = svc
56 .load_history(LoadHistoryParams {
57 messages: &mut self.msg.messages,
58 last_persisted_message_id: &mut self.msg.last_persisted_message_id,
59 deferred_hide_ids: &mut self.msg.deferred_db_hide_ids,
60 memory_view: &memory_view,
61 })
62 .await
63 .map_err(|e| {
64 super::error::AgentError::Memory(zeph_memory::MemoryError::Other(e.to_string()))
65 })?;
66
67 self.services.memory.persistence.unsummarized_count = unsummarized;
69
70 if outcome.messages_loaded > 0 {
71 let _ = memory
73 .sqlite()
74 .increment_session_counts_for_conversation(cid)
75 .await
76 .inspect_err(|e| {
77 tracing::warn!(error = %e, "failed to increment tier session counts");
78 });
79 }
80
81 self.update_metrics(|m| {
83 m.sqlite_message_count = outcome.sqlite_total_messages;
84 });
85 if let Ok(count) = memory.sqlite().count_semantic_facts().await {
86 let count_u64 = u64::try_from(count).unwrap_or(0);
87 self.update_metrics(|m| {
88 m.semantic_fact_count = count_u64;
89 });
90 }
91 if let Ok(count) = memory.unsummarized_message_count(cid).await {
92 self.services.memory.persistence.unsummarized_count =
93 usize::try_from(count).unwrap_or(0);
94 }
95
96 self.recompute_prompt_tokens();
97 Ok(())
98 }
99
100 #[tracing::instrument(name = "core.persist.persist_message", skip_all, level = "debug")]
106 pub(crate) async fn persist_message(
107 &mut self,
108 role: Role,
109 content: &str,
110 parts: &[MessagePart],
111 has_injection_flags: bool,
112 ) {
113 let guard_event = self
117 .services
118 .security
119 .exfiltration_guard
120 .should_guard_memory_write(has_injection_flags);
121 if let Some(ref event) = guard_event {
122 tracing::warn!(
123 ?event,
124 "exfiltration guard: skipping Qdrant embedding for flagged content"
125 );
126 self.push_security_event(
127 zeph_common::SecurityEventCategory::ExfiltrationBlock,
128 "memory_write",
129 "Qdrant embedding skipped: flagged content",
130 );
131 }
132
133 let req = PersistMessageRequest::from_borrowed(role, content, parts, has_injection_flags);
134
135 let mut unsummarized = self.services.memory.persistence.unsummarized_count;
136 let memory_arc = self.services.memory.persistence.memory.clone();
137 let mut memory_view = MemoryPersistenceView {
138 memory: memory_arc.as_ref(),
139 conversation_id: self.services.memory.persistence.conversation_id,
140 autosave_assistant: self.services.memory.persistence.autosave_assistant,
141 autosave_min_length: self.services.memory.persistence.autosave_min_length,
142 unsummarized_count: &mut unsummarized,
143 goal_text: self.services.memory.extraction.goal_text.clone(),
144 };
145 let security = SecurityView {
146 guard_memory_writes: guard_event.is_some(),
147 _phantom: std::marker::PhantomData,
148 };
149 let mut sqlite_delta = 0u64;
150 let mut embed_delta = 0u64;
151 let mut guard_delta = 0u64;
152 let mut metrics_view = MetricsView {
153 sqlite_message_count: &mut sqlite_delta,
154 embeddings_generated: &mut embed_delta,
155 exfiltration_memory_guards: &mut guard_delta,
156 };
157
158 let svc = PersistenceService::new();
159 let outcome = svc
160 .persist_message(
161 req,
162 &mut self.msg.last_persisted_message_id,
163 &mut memory_view,
164 &security,
165 &mut metrics_view,
166 )
167 .await;
168
169 self.services.memory.persistence.unsummarized_count = unsummarized;
171
172 self.update_metrics(|m| {
174 m.sqlite_message_count += sqlite_delta;
175 m.embeddings_generated += embed_delta;
176 m.exfiltration_memory_guards += guard_delta;
178 });
179
180 if outcome.message_id.is_none() {
181 return;
182 }
183
184 self.enqueue_summarization_task();
188
189 let has_tool_result_parts = parts
192 .iter()
193 .any(|p| matches!(p, MessagePart::ToolResult { .. }));
194
195 self.enqueue_graph_extraction_task(content, has_injection_flags, has_tool_result_parts)
196 .await;
197
198 if role == Role::User && !has_tool_result_parts && !has_injection_flags {
200 self.enqueue_persona_extraction_task();
201 }
202
203 if has_tool_result_parts {
205 self.enqueue_trajectory_extraction_task();
206 }
207
208 let has_tool_use_parts = parts
213 .iter()
214 .any(|p| matches!(p, MessagePart::ToolUse { .. }));
215 if role == Role::Assistant && !has_tool_use_parts && !has_injection_flags {
216 self.enqueue_reasoning_extraction_task();
217 self.enqueue_memcot_distill_task(content);
219 }
220 }
221
222 fn enqueue_memcot_distill_task(&mut self, assistant_content: &str) {
227 let Some(accumulator) = &self.services.memory.extraction.memcot_accumulator else {
228 return;
229 };
230 let distill_provider_name = self
231 .services
232 .memory
233 .extraction
234 .memcot_config
235 .distill_provider
236 .as_str();
237 let provider = self.resolve_background_provider(distill_provider_name);
238
239 let content = assistant_content.to_owned();
240 let supervisor = &mut self.runtime.lifecycle.supervisor;
241
242 accumulator.maybe_enqueue_distill(&content, provider, |name, fut| {
243 supervisor.spawn(super::agent_supervisor::TaskClass::Enrichment, name, fut);
244 });
245 }
246
247 fn enqueue_summarization_task(&mut self) {
249 let (Some(memory), Some(cid)) = (
250 self.services.memory.persistence.memory.clone(),
251 self.services.memory.persistence.conversation_id,
252 ) else {
253 return;
254 };
255
256 if self.services.memory.persistence.unsummarized_count
257 <= self.services.memory.compaction.summarization_threshold
258 {
259 return;
260 }
261
262 let batch_size = self.services.memory.compaction.summarization_threshold / 2;
263
264 self.runtime.lifecycle.supervisor.spawn_summarization("summarization", async move {
265 match tokio::time::timeout(
266 std::time::Duration::from_secs(30),
267 memory.summarize(cid, batch_size),
268 )
269 .await
270 {
271 Ok(Ok(Some(summary_id))) => {
272 tracing::info!(
273 "background summarization: created summary {summary_id} for conversation {cid}"
274 );
275 true
276 }
277 Ok(Ok(None)) => {
278 tracing::debug!("background summarization: no summarization needed");
279 false
280 }
281 Ok(Err(e)) => {
282 tracing::error!("background summarization failed: {e:#}");
283 false
284 }
285 Err(_) => {
286 tracing::warn!("background summarization timed out after 30s");
287 false
288 }
289 }
290 });
291 }
292
293 #[tracing::instrument(
298 name = "core.persist.enqueue_graph_extraction",
299 skip_all,
300 level = "debug"
301 )]
302 async fn enqueue_graph_extraction_task(
303 &mut self,
304 content: &str,
305 has_injection_flags: bool,
306 has_tool_result_parts: bool,
307 ) {
308 if self.services.memory.persistence.memory.is_none()
309 || self.services.memory.persistence.conversation_id.is_none()
310 {
311 return;
312 }
313 if has_tool_result_parts {
314 tracing::debug!("graph extraction skipped: message contains ToolResult parts");
315 return;
316 }
317 if has_injection_flags {
318 tracing::warn!("graph extraction skipped: injection patterns detected in content");
319 return;
320 }
321
322 let cfg = &self.services.memory.extraction.graph_config;
323 if !cfg.enabled {
324 return;
325 }
326 let embed_timeout_secs = self
327 .services
328 .memory
329 .persistence
330 .memory
331 .as_ref()
332 .map_or(5, |m| m.embed_timeout().as_secs());
333 let extraction_cfg = build_graph_extraction_config(
334 cfg,
335 self.services
336 .memory
337 .persistence
338 .conversation_id
339 .map(|c| c.0),
340 embed_timeout_secs,
341 );
342 let extract_provider_name = cfg.extract_provider.as_str().to_owned();
345
346 if self.rpe_should_skip(content).await {
349 tracing::debug!("D-MEM RPE: low-surprise turn, skipping graph extraction");
350 return;
351 }
352
353 let context_messages = collect_context_messages(&self.msg.messages);
354
355 let Some(memory) = self.services.memory.persistence.memory.clone() else {
356 return;
357 };
358
359 let validator: zeph_memory::semantic::PostExtractValidator =
360 if self.services.security.memory_validator.is_enabled() {
361 let v = self.services.security.memory_validator.clone();
362 Some(Box::new(move |result| {
363 v.validate_graph_extraction(result)
364 .map_err(|e| e.to_string())
365 }))
366 } else {
367 None
368 };
369
370 let provider_override = if extract_provider_name.is_empty() {
371 None
372 } else {
373 Some(self.resolve_background_provider(&extract_provider_name))
374 };
375
376 self.spawn_graph_extraction_task(
377 memory,
378 content,
379 context_messages,
380 extraction_cfg,
381 validator,
382 provider_override,
383 );
384
385 self.sync_community_detection_failures();
387 self.sync_graph_extraction_metrics();
388 self.enqueue_graph_count_sync_task();
389 }
390
391 fn spawn_graph_extraction_task(
392 &mut self,
393 memory: std::sync::Arc<zeph_memory::semantic::SemanticMemory>,
394 content: &str,
395 context_messages: Vec<String>,
396 extraction_cfg: zeph_memory::semantic::GraphExtractionConfig,
397 validator: zeph_memory::semantic::PostExtractValidator,
398 provider_override: Option<zeph_llm::any::AnyProvider>,
399 ) {
400 let content_owned = content.to_owned();
401 let graph_store = memory.graph_store.clone();
402 let metrics_tx = self.runtime.metrics.metrics_tx.clone();
403 let start_time = self.runtime.lifecycle.start_time;
404 let cancel = self.runtime.lifecycle.cancel_token.child_token();
405
406 self.runtime.lifecycle.supervisor.spawn(
407 super::agent_supervisor::TaskClass::Enrichment,
408 "graph_extraction",
409 async move {
410 let extraction_handle = memory.spawn_graph_extraction(
411 content_owned,
412 context_messages,
413 extraction_cfg,
414 validator,
415 provider_override,
416 cancel,
417 );
418
419 if let (Some(store), Some(tx)) = (graph_store, metrics_tx) {
421 let _ = extraction_handle.await;
422 let (entities, edges, communities) = tokio::join!(
423 store.entity_count(),
424 store.active_edge_count(),
425 store.community_count()
426 );
427 let elapsed = start_time.elapsed().as_secs();
428 tx.send_modify(|m| {
429 m.uptime_seconds = elapsed;
430 m.graph_entities_total = entities.unwrap_or(0).cast_unsigned();
431 m.graph_edges_total = edges.unwrap_or(0).cast_unsigned();
432 m.graph_communities_total = communities.unwrap_or(0).cast_unsigned();
433 });
434 } else {
435 let _ = extraction_handle.await;
436 }
437
438 tracing::debug!("background graph extraction complete");
439 },
440 );
441 }
442
443 fn enqueue_graph_count_sync_task(&mut self) {
445 let memory_for_sync = self.services.memory.persistence.memory.clone();
446 let metrics_tx_sync = self.runtime.metrics.metrics_tx.clone();
447 let start_time_sync = self.runtime.lifecycle.start_time;
448 let cid_sync = self.services.memory.persistence.conversation_id;
449 let graph_store_sync = memory_for_sync.as_ref().and_then(|m| m.graph_store.clone());
450 let sqlite_sync = memory_for_sync.as_ref().map(|m| m.sqlite().clone());
451 let guidelines_enabled = self.services.memory.extraction.graph_config.enabled;
452
453 self.runtime.lifecycle.supervisor.spawn(
454 super::agent_supervisor::TaskClass::Telemetry,
455 "graph_count_sync",
456 async move {
457 let Some(store) = graph_store_sync else {
458 return;
459 };
460 let Some(tx) = metrics_tx_sync else { return };
461
462 let (entities, edges, communities) = tokio::join!(
463 store.entity_count(),
464 store.active_edge_count(),
465 store.community_count()
466 );
467 let elapsed = start_time_sync.elapsed().as_secs();
468 tx.send_modify(|m| {
469 m.uptime_seconds = elapsed;
470 m.graph_entities_total = entities.unwrap_or(0).cast_unsigned();
471 m.graph_edges_total = edges.unwrap_or(0).cast_unsigned();
472 m.graph_communities_total = communities.unwrap_or(0).cast_unsigned();
473 });
474
475 if guidelines_enabled && let Some(sqlite) = sqlite_sync {
477 match tokio::time::timeout(
478 std::time::Duration::from_secs(10),
479 sqlite.load_compression_guidelines_meta(cid_sync),
480 )
481 .await
482 {
483 Ok(Ok((version, created_at))) => {
484 #[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
485 let version_u32 = u32::try_from(version).unwrap_or(0);
486 tx.send_modify(|m| {
487 m.guidelines_version = version_u32;
488 m.guidelines_updated_at = created_at;
489 });
490 }
491 Ok(Err(e)) => {
492 tracing::debug!("guidelines status sync failed: {e:#}");
493 }
494 Err(_) => {
495 tracing::debug!("guidelines status sync timed out");
496 }
497 }
498 }
499 },
500 );
501 }
502
503 fn enqueue_persona_extraction_task(&mut self) {
505 use zeph_memory::semantic::{PersonaExtractionConfig, extract_persona_facts};
506
507 let cfg = &self.services.memory.extraction.persona_config;
508 if !cfg.enabled {
509 return;
510 }
511
512 let Some(memory) = &self.services.memory.persistence.memory else {
513 return;
514 };
515
516 let user_messages: Vec<String> = self
517 .msg
518 .messages
519 .iter()
520 .filter(|m| {
521 m.role == Role::User
522 && !m
523 .parts
524 .iter()
525 .any(|p| matches!(p, MessagePart::ToolResult { .. }))
526 })
527 .take(8)
528 .map(|m| {
529 if m.content.len() > 2048 {
530 m.content[..m.content.floor_char_boundary(2048)].to_owned()
531 } else {
532 m.content.clone()
533 }
534 })
535 .collect();
536
537 if user_messages.len() < cfg.min_messages {
538 return;
539 }
540
541 let timeout_secs = cfg.extraction_timeout_secs;
542 let extraction_cfg = PersonaExtractionConfig {
543 enabled: cfg.enabled,
544 min_messages: cfg.min_messages,
545 max_messages: cfg.max_messages,
546 extraction_timeout_secs: timeout_secs,
547 };
548
549 let provider = self.resolve_background_provider(cfg.persona_provider.as_str());
550 let store = memory.sqlite().clone();
551 let conversation_id = self
552 .services
553 .memory
554 .persistence
555 .conversation_id
556 .map(|c| c.0);
557
558 self.runtime.lifecycle.supervisor.spawn(
559 super::agent_supervisor::TaskClass::Enrichment,
560 "persona_extraction",
561 async move {
562 let user_message_refs: Vec<&str> =
563 user_messages.iter().map(String::as_str).collect();
564 let fut = extract_persona_facts(
565 &store,
566 &provider,
567 &user_message_refs,
568 &extraction_cfg,
569 conversation_id,
570 );
571 match tokio::time::timeout(std::time::Duration::from_secs(timeout_secs), fut).await
572 {
573 Ok(Ok(n)) => tracing::debug!(upserted = n, "persona extraction complete"),
574 Ok(Err(e)) => tracing::warn!(error = %e, "persona extraction failed"),
575 Err(_) => tracing::warn!(
576 timeout_secs,
577 "persona extraction timed out — no facts written this turn"
578 ),
579 }
580 },
581 );
582 }
583
584 fn enqueue_trajectory_extraction_task(&mut self) {
586 use zeph_memory::semantic::{TrajectoryExtractionConfig, extract_trajectory_entries};
587
588 let cfg = self.services.memory.extraction.trajectory_config.clone();
589 if !cfg.enabled {
590 return;
591 }
592
593 let Some(memory) = &self.services.memory.persistence.memory else {
594 return;
595 };
596
597 let conversation_id = match self.services.memory.persistence.conversation_id {
598 Some(cid) => cid.0,
599 None => return,
600 };
601
602 let tail_start = self.msg.messages.len().saturating_sub(cfg.max_messages);
603 let turn_messages: Vec<zeph_llm::provider::Message> =
604 self.msg.messages[tail_start..].to_vec();
605
606 if turn_messages.is_empty() {
607 return;
608 }
609
610 let extraction_cfg = TrajectoryExtractionConfig {
611 enabled: cfg.enabled,
612 max_messages: cfg.max_messages,
613 extraction_timeout_secs: cfg.extraction_timeout_secs,
614 };
615
616 let provider = self.resolve_background_provider(cfg.trajectory_provider.as_str());
617 let store = memory.sqlite().clone();
618 let min_confidence = cfg.min_confidence;
619
620 self.runtime.lifecycle.supervisor.spawn(
621 super::agent_supervisor::TaskClass::Enrichment,
622 "trajectory_extraction",
623 async move {
624 let entries =
625 match extract_trajectory_entries(&provider, &turn_messages, &extraction_cfg)
626 .await
627 {
628 Ok(e) => e,
629 Err(e) => {
630 tracing::warn!(error = %e, "trajectory extraction failed");
631 return;
632 }
633 };
634
635 let last_id = store
636 .trajectory_last_extracted_message_id(conversation_id)
637 .await
638 .unwrap_or(0);
639
640 let mut max_id = last_id;
641 for entry in &entries {
642 if entry.confidence < min_confidence {
643 continue;
644 }
645 let tools_json = serde_json::to_string(&entry.tools_used)
646 .unwrap_or_else(|_| "[]".to_string());
647 match store
648 .insert_trajectory_entry(zeph_memory::NewTrajectoryEntry {
649 conversation_id: Some(conversation_id),
650 turn_index: 0,
651 kind: &entry.kind,
652 intent: &entry.intent,
653 outcome: &entry.outcome,
654 tools_used: &tools_json,
655 confidence: entry.confidence,
656 })
657 .await
658 {
659 Ok(id) => {
660 if id > max_id {
661 max_id = id;
662 }
663 }
664 Err(e) => tracing::warn!(error = %e, "failed to insert trajectory entry"),
665 }
666 }
667
668 if max_id > last_id {
669 let _ = store
670 .set_trajectory_last_extracted_message_id(conversation_id, max_id)
671 .await;
672 }
673
674 tracing::debug!(
675 count = entries.len(),
676 conversation_id,
677 "trajectory extraction complete"
678 );
679 },
680 );
681 }
682
683 fn enqueue_reasoning_extraction_task(&mut self) {
688 let cfg = self.services.memory.extraction.reasoning_config.clone();
689 if !cfg.enabled {
690 return;
691 }
692
693 let Some(memory) = &self.services.memory.persistence.memory else {
694 return;
695 };
696
697 let Some(reasoning) = memory.reasoning.clone() else {
698 return;
699 };
700
701 let tail_start = self.msg.messages.len().saturating_sub(cfg.max_messages);
702 let turn_messages: Vec<zeph_llm::provider::Message> =
703 self.msg.messages[tail_start..].to_vec();
704
705 if turn_messages.len() < cfg.min_messages {
706 return;
707 }
708
709 let extract_provider = self.resolve_background_provider(cfg.extract_provider.as_str());
710 let distill_provider = self.resolve_background_provider(cfg.distill_provider.as_str());
711 let embed_provider = memory.effective_embed_provider().clone();
712 let store_limit = cfg.store_limit;
713 let extraction_timeout = std::time::Duration::from_secs(cfg.extraction_timeout_secs);
714 let distill_timeout = std::time::Duration::from_secs(cfg.distill_timeout_secs);
715 let embed_timeout = memory.embed_timeout();
716 let self_judge_window = cfg.self_judge_window;
717 let min_assistant_chars = cfg.min_assistant_chars;
718
719 self.runtime.lifecycle.supervisor.spawn(
720 super::agent_supervisor::TaskClass::Enrichment,
721 "reasoning_extraction",
722 async move {
723 if let Err(e) = zeph_memory::process_reasoning_turn(
724 &reasoning,
725 &extract_provider,
726 &distill_provider,
727 &embed_provider,
728 &turn_messages,
729 zeph_memory::ProcessTurnConfig {
730 store_limit,
731 extraction_timeout,
732 distill_timeout,
733 embed_timeout,
734 self_judge_window,
735 min_assistant_chars,
736 },
737 )
738 .await
739 {
740 tracing::warn!(error = %e, "reasoning: process_turn failed");
741 }
742
743 tracing::debug!("reasoning extraction complete");
744 },
745 );
746 }
747
748 async fn rpe_should_skip(&mut self, content: &str) -> bool {
753 let Some(ref rpe_mutex) = self.services.memory.extraction.rpe_router else {
754 return false;
755 };
756 let Some(memory) = &self.services.memory.persistence.memory else {
757 return false;
758 };
759 let candidates = zeph_memory::extract_candidate_entities(content);
760 let provider = memory.provider();
761 let Ok(Ok(emb_vec)) =
762 tokio::time::timeout(std::time::Duration::from_secs(5), provider.embed(content)).await
763 else {
764 return false; };
766 if let Ok(mut router) = rpe_mutex.lock() {
767 let signal = router.compute(&emb_vec, &candidates);
768 router.push_embedding(emb_vec);
769 router.push_entities(&candidates);
770 !signal.should_extract
771 } else {
772 tracing::warn!("rpe_router mutex poisoned; falling through to extract");
773 false
774 }
775 }
776}
777
778#[cfg(test)]
779mod tests {
780 use super::super::agent_tests::{
781 MetricsSnapshot, MockChannel, MockToolExecutor, create_test_registry, mock_provider,
782 };
783 use super::*;
784 use zeph_llm::any::AnyProvider;
785 use zeph_llm::provider::Message;
786 use zeph_memory::semantic::SemanticMemory;
787
788 async fn test_memory(provider: &AnyProvider) -> SemanticMemory {
789 SemanticMemory::new(
790 ":memory:",
791 "http://127.0.0.1:1",
792 None,
793 provider.clone(),
794 "test-model",
795 )
796 .await
797 .unwrap()
798 }
799
800 #[tokio::test]
801 async fn load_history_without_memory_returns_ok() {
802 let provider = mock_provider(vec![]);
803 let channel = MockChannel::new(vec![]);
804 let registry = create_test_registry();
805 let executor = MockToolExecutor::no_tools();
806 let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
807
808 let result = agent.load_history().await;
809 assert!(result.is_ok());
810 assert_eq!(agent.msg.messages.len(), 1); }
813
814 #[tokio::test]
815 async fn load_history_with_messages_injects_into_agent() {
816 let provider = mock_provider(vec![]);
817 let channel = MockChannel::new(vec![]);
818 let registry = create_test_registry();
819 let executor = MockToolExecutor::no_tools();
820
821 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
822 let cid = memory.sqlite().create_conversation().await.unwrap();
823
824 memory
825 .sqlite()
826 .save_message(cid, "user", "hello from history")
827 .await
828 .unwrap();
829 memory
830 .sqlite()
831 .save_message(cid, "assistant", "hi back")
832 .await
833 .unwrap();
834
835 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
836 std::sync::Arc::new(memory),
837 cid,
838 50,
839 5,
840 100,
841 );
842
843 let messages_before = agent.msg.messages.len();
844 agent.load_history().await.unwrap();
845 assert_eq!(agent.msg.messages.len(), messages_before + 2);
847 }
848
849 #[tokio::test]
850 async fn load_history_skips_empty_messages() {
851 let provider = mock_provider(vec![]);
852 let channel = MockChannel::new(vec![]);
853 let registry = create_test_registry();
854 let executor = MockToolExecutor::no_tools();
855
856 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
857 let cid = memory.sqlite().create_conversation().await.unwrap();
858
859 memory
861 .sqlite()
862 .save_message(cid, "user", " ")
863 .await
864 .unwrap();
865 memory
866 .sqlite()
867 .save_message(cid, "user", "real message")
868 .await
869 .unwrap();
870
871 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
872 std::sync::Arc::new(memory),
873 cid,
874 50,
875 5,
876 100,
877 );
878
879 let messages_before = agent.msg.messages.len();
880 agent.load_history().await.unwrap();
881 assert_eq!(agent.msg.messages.len(), messages_before + 1);
883 }
884
885 #[tokio::test]
886 async fn load_history_with_empty_store_returns_ok() {
887 let provider = mock_provider(vec![]);
888 let channel = MockChannel::new(vec![]);
889 let registry = create_test_registry();
890 let executor = MockToolExecutor::no_tools();
891
892 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
893 let cid = memory.sqlite().create_conversation().await.unwrap();
894
895 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
896 std::sync::Arc::new(memory),
897 cid,
898 50,
899 5,
900 100,
901 );
902
903 let messages_before = agent.msg.messages.len();
904 agent.load_history().await.unwrap();
905 assert_eq!(agent.msg.messages.len(), messages_before);
907 }
908
909 #[tokio::test]
910 async fn load_history_increments_session_count_for_existing_messages() {
911 let provider = mock_provider(vec![]);
912 let channel = MockChannel::new(vec![]);
913 let registry = create_test_registry();
914 let executor = MockToolExecutor::no_tools();
915
916 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
917 let cid = memory.sqlite().create_conversation().await.unwrap();
918
919 let id1 = memory
921 .sqlite()
922 .save_message(cid, "user", "hello")
923 .await
924 .unwrap();
925 let id2 = memory
926 .sqlite()
927 .save_message(cid, "assistant", "hi")
928 .await
929 .unwrap();
930
931 let memory_arc = std::sync::Arc::new(memory);
932 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
933 memory_arc.clone(),
934 cid,
935 50,
936 5,
937 100,
938 );
939
940 agent.load_history().await.unwrap();
941
942 let counts: Vec<i64> = zeph_db::query_scalar(
944 "SELECT session_count FROM messages WHERE id IN (?, ?) ORDER BY id",
945 )
946 .bind(id1)
947 .bind(id2)
948 .fetch_all(memory_arc.sqlite().pool())
949 .await
950 .unwrap();
951 assert_eq!(
952 counts,
953 vec![1, 1],
954 "session_count must be 1 after first restore"
955 );
956 }
957
958 #[tokio::test]
959 async fn load_history_does_not_increment_session_count_for_new_conversation() {
960 let provider = mock_provider(vec![]);
961 let channel = MockChannel::new(vec![]);
962 let registry = create_test_registry();
963 let executor = MockToolExecutor::no_tools();
964
965 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
966 let cid = memory.sqlite().create_conversation().await.unwrap();
967
968 let memory_arc = std::sync::Arc::new(memory);
970 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
971 memory_arc.clone(),
972 cid,
973 50,
974 5,
975 100,
976 );
977
978 agent.load_history().await.unwrap();
979
980 let counts: Vec<i64> =
982 zeph_db::query_scalar("SELECT session_count FROM messages WHERE conversation_id = ?")
983 .bind(cid)
984 .fetch_all(memory_arc.sqlite().pool())
985 .await
986 .unwrap();
987 assert!(counts.is_empty(), "new conversation must have no messages");
988 }
989
990 #[tokio::test]
991 async fn persist_message_without_memory_silently_returns() {
992 let provider = mock_provider(vec![]);
994 let channel = MockChannel::new(vec![]);
995 let registry = create_test_registry();
996 let executor = MockToolExecutor::no_tools();
997 let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
998
999 agent.persist_message(Role::User, "hello", &[], false).await;
1001 }
1002
1003 #[tokio::test]
1004 async fn persist_message_assistant_autosave_false_uses_save_only() {
1005 let provider = mock_provider(vec![]);
1006 let channel = MockChannel::new(vec![]);
1007 let registry = create_test_registry();
1008 let executor = MockToolExecutor::no_tools();
1009
1010 let (tx, rx) = tokio::sync::watch::channel(MetricsSnapshot::default());
1011 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1012 let cid = memory.sqlite().create_conversation().await.unwrap();
1013
1014 let mut agent = Agent::new(provider, channel, registry, None, 5, executor)
1015 .with_metrics(tx)
1016 .with_memory(std::sync::Arc::new(memory), cid, 50, 5, 100);
1017 agent.services.memory.persistence.autosave_assistant = false;
1018 agent.services.memory.persistence.autosave_min_length = 20;
1019
1020 agent
1021 .persist_message(Role::Assistant, "short assistant reply", &[], false)
1022 .await;
1023
1024 let history = agent
1025 .services
1026 .memory
1027 .persistence
1028 .memory
1029 .as_ref()
1030 .unwrap()
1031 .sqlite()
1032 .load_history(cid, 50)
1033 .await
1034 .unwrap();
1035 assert_eq!(history.len(), 1, "message must be saved");
1036 assert_eq!(history[0].content, "short assistant reply");
1037 assert_eq!(rx.borrow().embeddings_generated, 0);
1039 }
1040
1041 #[tokio::test]
1042 async fn persist_message_assistant_below_min_length_uses_save_only() {
1043 let provider = mock_provider(vec![]);
1044 let channel = MockChannel::new(vec![]);
1045 let registry = create_test_registry();
1046 let executor = MockToolExecutor::no_tools();
1047
1048 let (tx, rx) = tokio::sync::watch::channel(MetricsSnapshot::default());
1049 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1050 let cid = memory.sqlite().create_conversation().await.unwrap();
1051
1052 let mut agent = Agent::new(provider, channel, registry, None, 5, executor)
1054 .with_metrics(tx)
1055 .with_memory(std::sync::Arc::new(memory), cid, 50, 5, 100);
1056 agent.services.memory.persistence.autosave_assistant = true;
1057 agent.services.memory.persistence.autosave_min_length = 1000;
1058
1059 agent
1060 .persist_message(Role::Assistant, "too short", &[], false)
1061 .await;
1062
1063 let history = agent
1064 .services
1065 .memory
1066 .persistence
1067 .memory
1068 .as_ref()
1069 .unwrap()
1070 .sqlite()
1071 .load_history(cid, 50)
1072 .await
1073 .unwrap();
1074 assert_eq!(history.len(), 1, "message must be saved");
1075 assert_eq!(history[0].content, "too short");
1076 assert_eq!(rx.borrow().embeddings_generated, 0);
1077 }
1078
1079 #[tokio::test]
1080 async fn persist_message_assistant_at_min_length_boundary_uses_embed() {
1081 let provider = mock_provider(vec![]);
1083 let channel = MockChannel::new(vec![]);
1084 let registry = create_test_registry();
1085 let executor = MockToolExecutor::no_tools();
1086
1087 let (tx, rx) = tokio::sync::watch::channel(MetricsSnapshot::default());
1088 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1089 let cid = memory.sqlite().create_conversation().await.unwrap();
1090
1091 let min_length = 10usize;
1092 let mut agent = Agent::new(provider, channel, registry, None, 5, executor)
1093 .with_metrics(tx)
1094 .with_memory(std::sync::Arc::new(memory), cid, 50, 5, 100);
1095 agent.services.memory.persistence.autosave_assistant = true;
1096 agent.services.memory.persistence.autosave_min_length = min_length;
1097
1098 let content_at_boundary = "A".repeat(min_length);
1100 assert_eq!(content_at_boundary.len(), min_length);
1101 agent
1102 .persist_message(Role::Assistant, &content_at_boundary, &[], false)
1103 .await;
1104
1105 assert_eq!(rx.borrow().sqlite_message_count, 1);
1107 }
1108
1109 #[tokio::test]
1110 async fn persist_message_assistant_one_below_min_length_uses_save_only() {
1111 let provider = mock_provider(vec![]);
1113 let channel = MockChannel::new(vec![]);
1114 let registry = create_test_registry();
1115 let executor = MockToolExecutor::no_tools();
1116
1117 let (tx, rx) = tokio::sync::watch::channel(MetricsSnapshot::default());
1118 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1119 let cid = memory.sqlite().create_conversation().await.unwrap();
1120
1121 let min_length = 10usize;
1122 let mut agent = Agent::new(provider, channel, registry, None, 5, executor)
1123 .with_metrics(tx)
1124 .with_memory(std::sync::Arc::new(memory), cid, 50, 5, 100);
1125 agent.services.memory.persistence.autosave_assistant = true;
1126 agent.services.memory.persistence.autosave_min_length = min_length;
1127
1128 let content_below_boundary = "A".repeat(min_length - 1);
1130 assert_eq!(content_below_boundary.len(), min_length - 1);
1131 agent
1132 .persist_message(Role::Assistant, &content_below_boundary, &[], false)
1133 .await;
1134
1135 let history = agent
1136 .services
1137 .memory
1138 .persistence
1139 .memory
1140 .as_ref()
1141 .unwrap()
1142 .sqlite()
1143 .load_history(cid, 50)
1144 .await
1145 .unwrap();
1146 assert_eq!(history.len(), 1, "message must still be saved");
1147 assert_eq!(rx.borrow().embeddings_generated, 0);
1149 }
1150
1151 #[tokio::test]
1152 async fn persist_message_increments_unsummarized_count() {
1153 let provider = mock_provider(vec![]);
1154 let channel = MockChannel::new(vec![]);
1155 let registry = create_test_registry();
1156 let executor = MockToolExecutor::no_tools();
1157
1158 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1159 let cid = memory.sqlite().create_conversation().await.unwrap();
1160
1161 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
1163 std::sync::Arc::new(memory),
1164 cid,
1165 50,
1166 5,
1167 100,
1168 );
1169
1170 assert_eq!(agent.services.memory.persistence.unsummarized_count, 0);
1171
1172 agent.persist_message(Role::User, "first", &[], false).await;
1173 assert_eq!(agent.services.memory.persistence.unsummarized_count, 1);
1174
1175 agent
1176 .persist_message(Role::User, "second", &[], false)
1177 .await;
1178 assert_eq!(agent.services.memory.persistence.unsummarized_count, 2);
1179 }
1180
1181 #[tokio::test]
1182 async fn check_summarization_resets_counter_on_success() {
1183 let provider = mock_provider(vec![]);
1184 let channel = MockChannel::new(vec![]);
1185 let registry = create_test_registry();
1186 let executor = MockToolExecutor::no_tools();
1187
1188 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1189 let cid = memory.sqlite().create_conversation().await.unwrap();
1190
1191 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
1193 std::sync::Arc::new(memory),
1194 cid,
1195 50,
1196 5,
1197 1,
1198 );
1199
1200 agent.persist_message(Role::User, "msg1", &[], false).await;
1201 agent.persist_message(Role::User, "msg2", &[], false).await;
1202
1203 assert!(agent.services.memory.persistence.unsummarized_count <= 2);
1208 }
1209
1210 #[tokio::test]
1211 async fn unsummarized_count_not_incremented_without_memory() {
1212 let provider = mock_provider(vec![]);
1213 let channel = MockChannel::new(vec![]);
1214 let registry = create_test_registry();
1215 let executor = MockToolExecutor::no_tools();
1216 let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
1217
1218 agent.persist_message(Role::User, "hello", &[], false).await;
1219 assert_eq!(agent.services.memory.persistence.unsummarized_count, 0);
1221 }
1222
1223 mod graph_extraction_guards {
1225 use super::*;
1226 use crate::config::GraphConfig;
1227 use zeph_llm::provider::MessageMetadata;
1228 use zeph_memory::graph::GraphStore;
1229
1230 fn enabled_graph_config() -> GraphConfig {
1231 GraphConfig {
1232 enabled: true,
1233 ..GraphConfig::default()
1234 }
1235 }
1236
1237 async fn agent_with_graph(
1238 provider: &AnyProvider,
1239 config: GraphConfig,
1240 ) -> Agent<MockChannel> {
1241 let memory =
1242 test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1243 let cid = memory.sqlite().create_conversation().await.unwrap();
1244 Agent::new(
1245 provider.clone(),
1246 MockChannel::new(vec![]),
1247 create_test_registry(),
1248 None,
1249 5,
1250 MockToolExecutor::no_tools(),
1251 )
1252 .with_memory(std::sync::Arc::new(memory), cid, 50, 5, 100)
1253 .with_graph_config(config)
1254 }
1255
1256 #[tokio::test]
1257 async fn injection_flag_guard_skips_extraction() {
1258 let provider = mock_provider(vec![]);
1260 let mut agent = agent_with_graph(&provider, enabled_graph_config()).await;
1261 let pool = agent
1262 .services
1263 .memory
1264 .persistence
1265 .memory
1266 .as_ref()
1267 .unwrap()
1268 .sqlite()
1269 .pool()
1270 .clone();
1271
1272 agent
1273 .enqueue_graph_extraction_task("I use Rust", true, false)
1274 .await;
1275
1276 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1278
1279 let store = GraphStore::new(pool);
1280 let count = store.get_metadata("extraction_count").await.unwrap();
1281 assert!(
1282 count.is_none(),
1283 "injection flag must prevent extraction_count from being written"
1284 );
1285 }
1286
1287 #[tokio::test]
1288 async fn disabled_config_guard_skips_extraction() {
1289 let provider = mock_provider(vec![]);
1291 let disabled_cfg = GraphConfig {
1292 enabled: false,
1293 ..GraphConfig::default()
1294 };
1295 let mut agent = agent_with_graph(&provider, disabled_cfg).await;
1296 let pool = agent
1297 .services
1298 .memory
1299 .persistence
1300 .memory
1301 .as_ref()
1302 .unwrap()
1303 .sqlite()
1304 .pool()
1305 .clone();
1306
1307 agent
1308 .enqueue_graph_extraction_task("I use Rust", false, false)
1309 .await;
1310
1311 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1312
1313 let store = GraphStore::new(pool);
1314 let count = store.get_metadata("extraction_count").await.unwrap();
1315 assert!(
1316 count.is_none(),
1317 "disabled graph config must prevent extraction"
1318 );
1319 }
1320
1321 #[tokio::test]
1322 async fn happy_path_fires_extraction() {
1323 let provider = mock_provider(vec![]);
1326 let mut agent = agent_with_graph(&provider, enabled_graph_config()).await;
1327 let pool = agent
1328 .services
1329 .memory
1330 .persistence
1331 .memory
1332 .as_ref()
1333 .unwrap()
1334 .sqlite()
1335 .pool()
1336 .clone();
1337
1338 agent
1339 .enqueue_graph_extraction_task("I use Rust for systems programming", false, false)
1340 .await;
1341
1342 tokio::time::sleep(std::time::Duration::from_millis(200)).await;
1344
1345 let store = GraphStore::new(pool);
1346 let count = store.get_metadata("extraction_count").await.unwrap();
1347 assert!(
1348 count.is_some(),
1349 "happy-path extraction must increment extraction_count"
1350 );
1351 }
1352
1353 #[tokio::test]
1354 async fn tool_result_parts_guard_skips_extraction() {
1355 let provider = mock_provider(vec![]);
1359 let mut agent = agent_with_graph(&provider, enabled_graph_config()).await;
1360 let pool = agent
1361 .services
1362 .memory
1363 .persistence
1364 .memory
1365 .as_ref()
1366 .unwrap()
1367 .sqlite()
1368 .pool()
1369 .clone();
1370
1371 agent
1372 .enqueue_graph_extraction_task(
1373 "[tool_result: abc123]\nprovider_type = \"claude\"\nallowed_commands = []",
1374 false,
1375 true, )
1377 .await;
1378
1379 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1380
1381 let store = GraphStore::new(pool);
1382 let count = store.get_metadata("extraction_count").await.unwrap();
1383 assert!(
1384 count.is_none(),
1385 "tool result message must not trigger graph extraction"
1386 );
1387 }
1388
1389 #[tokio::test]
1390 async fn context_filter_excludes_tool_result_messages() {
1391 let provider = mock_provider(vec![]);
1402 let mut agent = agent_with_graph(&provider, enabled_graph_config()).await;
1403
1404 agent.msg.messages.push(Message {
1407 role: Role::User,
1408 content: "[tool_result: abc]\nprovider_type = \"openai\"".to_owned(),
1409 parts: vec![MessagePart::ToolResult {
1410 tool_use_id: "abc".to_owned(),
1411 content: "provider_type = \"openai\"".to_owned(),
1412 is_error: false,
1413 }],
1414 metadata: MessageMetadata::default(),
1415 });
1416
1417 let pool = agent
1418 .services
1419 .memory
1420 .persistence
1421 .memory
1422 .as_ref()
1423 .unwrap()
1424 .sqlite()
1425 .pool()
1426 .clone();
1427
1428 agent
1430 .enqueue_graph_extraction_task(
1431 "I prefer Rust for systems programming",
1432 false,
1433 false,
1434 )
1435 .await;
1436
1437 tokio::time::sleep(std::time::Duration::from_millis(200)).await;
1438
1439 let store = GraphStore::new(pool);
1441 let count = store.get_metadata("extraction_count").await.unwrap();
1442 assert!(
1443 count.is_some(),
1444 "conversational message must trigger extraction even with prior tool result in history"
1445 );
1446 }
1447 }
1448
1449 mod persona_extraction_guards {
1451 use super::*;
1452 use zeph_config::PersonaConfig;
1453 use zeph_llm::provider::MessageMetadata;
1454
1455 fn enabled_persona_config() -> PersonaConfig {
1456 PersonaConfig {
1457 enabled: true,
1458 min_messages: 1,
1459 ..PersonaConfig::default()
1460 }
1461 }
1462
1463 async fn agent_with_persona(
1464 provider: &AnyProvider,
1465 config: PersonaConfig,
1466 ) -> Agent<MockChannel> {
1467 let memory =
1468 test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1469 let cid = memory.sqlite().create_conversation().await.unwrap();
1470 let mut agent = Agent::new(
1471 provider.clone(),
1472 MockChannel::new(vec![]),
1473 create_test_registry(),
1474 None,
1475 5,
1476 MockToolExecutor::no_tools(),
1477 )
1478 .with_memory(std::sync::Arc::new(memory), cid, 50, 5, 100);
1479 agent.services.memory.extraction.persona_config = config;
1480 agent
1481 }
1482
1483 #[tokio::test]
1484 async fn disabled_config_skips_spawn() {
1485 let provider = mock_provider(vec![]);
1487 let mut agent = agent_with_persona(
1488 &provider,
1489 PersonaConfig {
1490 enabled: false,
1491 ..PersonaConfig::default()
1492 },
1493 )
1494 .await;
1495
1496 agent.msg.messages.push(zeph_llm::provider::Message {
1498 role: Role::User,
1499 content: "I prefer Rust for systems programming".to_owned(),
1500 parts: vec![],
1501 metadata: MessageMetadata::default(),
1502 });
1503
1504 agent.enqueue_persona_extraction_task();
1505
1506 let store = agent
1507 .services
1508 .memory
1509 .persistence
1510 .memory
1511 .as_ref()
1512 .unwrap()
1513 .sqlite()
1514 .clone();
1515 let count = store.count_persona_facts().await.unwrap();
1516 assert_eq!(count, 0, "disabled persona config must not write any facts");
1517 }
1518
1519 #[tokio::test]
1520 async fn below_min_messages_skips_spawn() {
1521 let provider = mock_provider(vec![]);
1523 let mut agent = agent_with_persona(
1524 &provider,
1525 PersonaConfig {
1526 enabled: true,
1527 min_messages: 3,
1528 ..PersonaConfig::default()
1529 },
1530 )
1531 .await;
1532
1533 for text in ["I use Rust", "I prefer async code"] {
1534 agent.msg.messages.push(zeph_llm::provider::Message {
1535 role: Role::User,
1536 content: text.to_owned(),
1537 parts: vec![],
1538 metadata: MessageMetadata::default(),
1539 });
1540 }
1541
1542 agent.enqueue_persona_extraction_task();
1543
1544 let store = agent
1545 .services
1546 .memory
1547 .persistence
1548 .memory
1549 .as_ref()
1550 .unwrap()
1551 .sqlite()
1552 .clone();
1553 let count = store.count_persona_facts().await.unwrap();
1554 assert_eq!(
1555 count, 0,
1556 "below min_messages threshold must not trigger extraction"
1557 );
1558 }
1559
1560 #[tokio::test]
1561 async fn no_memory_skips_spawn() {
1562 let provider = mock_provider(vec![]);
1564 let channel = MockChannel::new(vec![]);
1565 let registry = create_test_registry();
1566 let executor = MockToolExecutor::no_tools();
1567 let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
1568 agent.services.memory.extraction.persona_config = enabled_persona_config();
1569 agent.msg.messages.push(zeph_llm::provider::Message {
1570 role: Role::User,
1571 content: "I like Rust".to_owned(),
1572 parts: vec![],
1573 metadata: MessageMetadata::default(),
1574 });
1575
1576 agent.enqueue_persona_extraction_task();
1578 }
1579
1580 #[tokio::test]
1581 async fn enabled_enough_messages_spawns_extraction() {
1582 use zeph_llm::mock::MockProvider;
1585 let (mock, recorded) = MockProvider::default().with_recording();
1586 let provider = AnyProvider::Mock(mock);
1587 let mut agent = agent_with_persona(&provider, enabled_persona_config()).await;
1588
1589 agent.msg.messages.push(zeph_llm::provider::Message {
1590 role: Role::User,
1591 content: "I prefer Rust for systems programming".to_owned(),
1592 parts: vec![],
1593 metadata: MessageMetadata::default(),
1594 });
1595
1596 agent.enqueue_persona_extraction_task();
1597
1598 agent.runtime.lifecycle.supervisor.join_all_for_test().await;
1600
1601 let calls = recorded.lock().unwrap();
1602 assert!(
1603 !calls.is_empty(),
1604 "happy-path: provider.chat() must be called when extraction completes"
1605 );
1606 }
1607
1608 #[tokio::test]
1609 async fn messages_capped_at_eight() {
1610 use zeph_llm::mock::MockProvider;
1613 let (mock, recorded) = MockProvider::default().with_recording();
1614 let provider = AnyProvider::Mock(mock);
1615 let mut agent = agent_with_persona(&provider, enabled_persona_config()).await;
1616
1617 for i in 0..12u32 {
1618 agent.msg.messages.push(zeph_llm::provider::Message {
1619 role: Role::User,
1620 content: format!("I like message {i}"),
1621 parts: vec![],
1622 metadata: MessageMetadata::default(),
1623 });
1624 }
1625
1626 agent.enqueue_persona_extraction_task();
1627
1628 agent.runtime.lifecycle.supervisor.join_all_for_test().await;
1630
1631 let calls = recorded.lock().unwrap();
1633 assert!(
1634 !calls.is_empty(),
1635 "extraction must run when enough messages present"
1636 );
1637 let prompt = &calls[0];
1639 let user_text = prompt
1640 .iter()
1641 .filter(|m| m.role == Role::User)
1642 .map(|m| m.content.as_str())
1643 .collect::<Vec<_>>()
1644 .join(" ");
1645 assert!(
1647 !user_text.contains("I like message 8"),
1648 "message index 8 must be excluded from extraction input"
1649 );
1650 }
1651
1652 #[test]
1653 fn long_message_truncated_at_char_boundary() {
1654 let long_content = "x".repeat(3000);
1658 let truncated = if long_content.len() > 2048 {
1659 long_content[..long_content.floor_char_boundary(2048)].to_owned()
1660 } else {
1661 long_content.clone()
1662 };
1663 assert_eq!(
1664 truncated.len(),
1665 2048,
1666 "ASCII content must be truncated to exactly 2048 bytes"
1667 );
1668
1669 let multi = "é".repeat(1500); let truncated_multi = if multi.len() > 2048 {
1672 multi[..multi.floor_char_boundary(2048)].to_owned()
1673 } else {
1674 multi.clone()
1675 };
1676 assert!(
1677 truncated_multi.len() <= 2048,
1678 "multi-byte content must not exceed 2048 bytes"
1679 );
1680 assert!(truncated_multi.is_char_boundary(truncated_multi.len()));
1681 }
1682 }
1683
1684 #[tokio::test]
1685 async fn persist_message_user_always_embeds_regardless_of_autosave_flag() {
1686 let provider = mock_provider(vec![]);
1687 let channel = MockChannel::new(vec![]);
1688 let registry = create_test_registry();
1689 let executor = MockToolExecutor::no_tools();
1690
1691 let (tx, rx) = tokio::sync::watch::channel(MetricsSnapshot::default());
1692 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1693 let cid = memory.sqlite().create_conversation().await.unwrap();
1694
1695 let mut agent = Agent::new(provider, channel, registry, None, 5, executor)
1697 .with_metrics(tx)
1698 .with_memory(std::sync::Arc::new(memory), cid, 50, 5, 100);
1699 agent.services.memory.persistence.autosave_assistant = false;
1700 agent.services.memory.persistence.autosave_min_length = 20;
1701
1702 let long_user_msg = "A".repeat(100);
1703 agent
1704 .persist_message(Role::User, &long_user_msg, &[], false)
1705 .await;
1706
1707 let history = agent
1708 .services
1709 .memory
1710 .persistence
1711 .memory
1712 .as_ref()
1713 .unwrap()
1714 .sqlite()
1715 .load_history(cid, 50)
1716 .await
1717 .unwrap();
1718 assert_eq!(history.len(), 1, "user message must be saved");
1719 assert_eq!(rx.borrow().sqlite_message_count, 1);
1722 }
1723
1724 #[tokio::test]
1728 async fn persist_message_saves_correct_tool_use_parts() {
1729 use zeph_llm::provider::MessagePart;
1730
1731 let provider = mock_provider(vec![]);
1732 let channel = MockChannel::new(vec![]);
1733 let registry = create_test_registry();
1734 let executor = MockToolExecutor::no_tools();
1735
1736 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1737 let cid = memory.sqlite().create_conversation().await.unwrap();
1738
1739 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
1740 std::sync::Arc::new(memory),
1741 cid,
1742 50,
1743 5,
1744 100,
1745 );
1746
1747 let parts = vec![MessagePart::ToolUse {
1748 id: "call_abc123".to_string(),
1749 name: "read_file".to_string(),
1750 input: serde_json::json!({"path": "/tmp/test.txt"}),
1751 }];
1752 let content = "[tool_use: read_file(call_abc123)]";
1753
1754 agent
1755 .persist_message(Role::Assistant, content, &parts, false)
1756 .await;
1757
1758 let history = agent
1759 .services
1760 .memory
1761 .persistence
1762 .memory
1763 .as_ref()
1764 .unwrap()
1765 .sqlite()
1766 .load_history(cid, 50)
1767 .await
1768 .unwrap();
1769
1770 assert_eq!(history.len(), 1);
1771 assert_eq!(history[0].role, Role::Assistant);
1772 assert_eq!(history[0].content, content);
1773 assert_eq!(history[0].parts.len(), 1);
1774 match &history[0].parts[0] {
1775 MessagePart::ToolUse { id, name, .. } => {
1776 assert_eq!(id, "call_abc123");
1777 assert_eq!(name, "read_file");
1778 }
1779 other => panic!("expected ToolUse part, got {other:?}"),
1780 }
1781 assert!(
1783 !history[0]
1784 .parts
1785 .iter()
1786 .any(|p| matches!(p, MessagePart::ToolResult { .. })),
1787 "assistant message must not contain ToolResult parts"
1788 );
1789 }
1790
1791 #[tokio::test]
1792 async fn persist_message_saves_correct_tool_result_parts() {
1793 use zeph_llm::provider::MessagePart;
1794
1795 let provider = mock_provider(vec![]);
1796 let channel = MockChannel::new(vec![]);
1797 let registry = create_test_registry();
1798 let executor = MockToolExecutor::no_tools();
1799
1800 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1801 let cid = memory.sqlite().create_conversation().await.unwrap();
1802
1803 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
1804 std::sync::Arc::new(memory),
1805 cid,
1806 50,
1807 5,
1808 100,
1809 );
1810
1811 let parts = vec![MessagePart::ToolResult {
1812 tool_use_id: "call_abc123".to_string(),
1813 content: "file contents here".to_string(),
1814 is_error: false,
1815 }];
1816 let content = "[tool_result: call_abc123]\nfile contents here";
1817
1818 agent
1819 .persist_message(Role::User, content, &parts, false)
1820 .await;
1821
1822 let history = agent
1823 .services
1824 .memory
1825 .persistence
1826 .memory
1827 .as_ref()
1828 .unwrap()
1829 .sqlite()
1830 .load_history(cid, 50)
1831 .await
1832 .unwrap();
1833
1834 assert_eq!(history.len(), 1);
1835 assert_eq!(history[0].role, Role::User);
1836 assert_eq!(history[0].content, content);
1837 assert_eq!(history[0].parts.len(), 1);
1838 match &history[0].parts[0] {
1839 MessagePart::ToolResult {
1840 tool_use_id,
1841 content: result_content,
1842 is_error,
1843 } => {
1844 assert_eq!(tool_use_id, "call_abc123");
1845 assert_eq!(result_content, "file contents here");
1846 assert!(!is_error);
1847 }
1848 other => panic!("expected ToolResult part, got {other:?}"),
1849 }
1850 assert!(
1852 !history[0]
1853 .parts
1854 .iter()
1855 .any(|p| matches!(p, MessagePart::ToolUse { .. })),
1856 "user ToolResult message must not contain ToolUse parts"
1857 );
1858 }
1859
1860 #[tokio::test]
1861 async fn persist_message_roundtrip_preserves_role_part_alignment() {
1862 use zeph_llm::provider::MessagePart;
1863
1864 let provider = mock_provider(vec![]);
1865 let channel = MockChannel::new(vec![]);
1866 let registry = create_test_registry();
1867 let executor = MockToolExecutor::no_tools();
1868
1869 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1870 let cid = memory.sqlite().create_conversation().await.unwrap();
1871
1872 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
1873 std::sync::Arc::new(memory),
1874 cid,
1875 50,
1876 5,
1877 100,
1878 );
1879
1880 let assistant_parts = vec![MessagePart::ToolUse {
1882 id: "id_1".to_string(),
1883 name: "list_dir".to_string(),
1884 input: serde_json::json!({"path": "/tmp"}),
1885 }];
1886 agent
1887 .persist_message(
1888 Role::Assistant,
1889 "[tool_use: list_dir(id_1)]",
1890 &assistant_parts,
1891 false,
1892 )
1893 .await;
1894
1895 let user_parts = vec![MessagePart::ToolResult {
1897 tool_use_id: "id_1".to_string(),
1898 content: "file1.txt\nfile2.txt".to_string(),
1899 is_error: false,
1900 }];
1901 agent
1902 .persist_message(
1903 Role::User,
1904 "[tool_result: id_1]\nfile1.txt\nfile2.txt",
1905 &user_parts,
1906 false,
1907 )
1908 .await;
1909
1910 let history = agent
1911 .services
1912 .memory
1913 .persistence
1914 .memory
1915 .as_ref()
1916 .unwrap()
1917 .sqlite()
1918 .load_history(cid, 50)
1919 .await
1920 .unwrap();
1921
1922 assert_eq!(history.len(), 2);
1923
1924 assert_eq!(history[0].role, Role::Assistant);
1926 assert_eq!(history[0].content, "[tool_use: list_dir(id_1)]");
1927 assert!(
1928 matches!(&history[0].parts[0], MessagePart::ToolUse { id, .. } if id == "id_1"),
1929 "first message must be assistant ToolUse"
1930 );
1931
1932 assert_eq!(history[1].role, Role::User);
1934 assert_eq!(
1935 history[1].content,
1936 "[tool_result: id_1]\nfile1.txt\nfile2.txt"
1937 );
1938 assert!(
1939 matches!(&history[1].parts[0], MessagePart::ToolResult { tool_use_id, .. } if tool_use_id == "id_1"),
1940 "second message must be user ToolResult"
1941 );
1942
1943 assert!(
1945 !history[0]
1946 .parts
1947 .iter()
1948 .any(|p| matches!(p, MessagePart::ToolResult { .. })),
1949 "assistant message must not have ToolResult parts"
1950 );
1951 assert!(
1952 !history[1]
1953 .parts
1954 .iter()
1955 .any(|p| matches!(p, MessagePart::ToolUse { .. })),
1956 "user message must not have ToolUse parts"
1957 );
1958 }
1959
1960 #[tokio::test]
1961 async fn persist_message_saves_correct_tool_output_parts() {
1962 use zeph_llm::provider::MessagePart;
1963
1964 let provider = mock_provider(vec![]);
1965 let channel = MockChannel::new(vec![]);
1966 let registry = create_test_registry();
1967 let executor = MockToolExecutor::no_tools();
1968
1969 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1970 let cid = memory.sqlite().create_conversation().await.unwrap();
1971
1972 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
1973 std::sync::Arc::new(memory),
1974 cid,
1975 50,
1976 5,
1977 100,
1978 );
1979
1980 let parts = vec![MessagePart::ToolOutput {
1981 tool_name: "shell".into(),
1982 body: "hello from shell".to_string(),
1983 compacted_at: None,
1984 }];
1985 let content = "[tool: shell]\nhello from shell";
1986
1987 agent
1988 .persist_message(Role::User, content, &parts, false)
1989 .await;
1990
1991 let history = agent
1992 .services
1993 .memory
1994 .persistence
1995 .memory
1996 .as_ref()
1997 .unwrap()
1998 .sqlite()
1999 .load_history(cid, 50)
2000 .await
2001 .unwrap();
2002
2003 assert_eq!(history.len(), 1);
2004 assert_eq!(history[0].role, Role::User);
2005 assert_eq!(history[0].content, content);
2006 assert_eq!(history[0].parts.len(), 1);
2007 match &history[0].parts[0] {
2008 MessagePart::ToolOutput {
2009 tool_name,
2010 body,
2011 compacted_at,
2012 } => {
2013 assert_eq!(tool_name, "shell");
2014 assert_eq!(body, "hello from shell");
2015 assert!(compacted_at.is_none());
2016 }
2017 other => panic!("expected ToolOutput part, got {other:?}"),
2018 }
2019 }
2020
2021 #[tokio::test]
2024 async fn load_history_removes_trailing_orphan_tool_use() {
2025 use zeph_llm::provider::MessagePart;
2026
2027 let provider = mock_provider(vec![]);
2028 let channel = MockChannel::new(vec![]);
2029 let registry = create_test_registry();
2030 let executor = MockToolExecutor::no_tools();
2031
2032 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2033 let cid = memory.sqlite().create_conversation().await.unwrap();
2034 let sqlite = memory.sqlite();
2035
2036 sqlite
2038 .save_message(cid, "user", "do something with a tool")
2039 .await
2040 .unwrap();
2041
2042 let parts = serde_json::to_string(&[MessagePart::ToolUse {
2044 id: "call_orphan".to_string(),
2045 name: "shell".to_string(),
2046 input: serde_json::json!({"command": "ls"}),
2047 }])
2048 .unwrap();
2049 sqlite
2050 .save_message_with_parts(cid, "assistant", "[tool_use: shell(call_orphan)]", &parts)
2051 .await
2052 .unwrap();
2053
2054 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2055 std::sync::Arc::new(memory),
2056 cid,
2057 50,
2058 5,
2059 100,
2060 );
2061
2062 let messages_before = agent.msg.messages.len();
2063 agent.load_history().await.unwrap();
2064
2065 assert_eq!(
2067 agent.msg.messages.len(),
2068 messages_before + 1,
2069 "orphaned trailing tool_use must be removed"
2070 );
2071 assert_eq!(agent.msg.messages.last().unwrap().role, Role::User);
2072 }
2073
2074 #[tokio::test]
2075 async fn load_history_removes_leading_orphan_tool_result() {
2076 use zeph_llm::provider::MessagePart;
2077
2078 let provider = mock_provider(vec![]);
2079 let channel = MockChannel::new(vec![]);
2080 let registry = create_test_registry();
2081 let executor = MockToolExecutor::no_tools();
2082
2083 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2084 let cid = memory.sqlite().create_conversation().await.unwrap();
2085 let sqlite = memory.sqlite();
2086
2087 let result_parts = serde_json::to_string(&[MessagePart::ToolResult {
2089 tool_use_id: "call_missing".to_string(),
2090 content: "result data".to_string(),
2091 is_error: false,
2092 }])
2093 .unwrap();
2094 sqlite
2095 .save_message_with_parts(
2096 cid,
2097 "user",
2098 "[tool_result: call_missing]\nresult data",
2099 &result_parts,
2100 )
2101 .await
2102 .unwrap();
2103
2104 sqlite
2106 .save_message(cid, "assistant", "here is my response")
2107 .await
2108 .unwrap();
2109
2110 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2111 std::sync::Arc::new(memory),
2112 cid,
2113 50,
2114 5,
2115 100,
2116 );
2117
2118 let messages_before = agent.msg.messages.len();
2119 agent.load_history().await.unwrap();
2120
2121 assert_eq!(
2123 agent.msg.messages.len(),
2124 messages_before + 1,
2125 "orphaned leading tool_result must be removed"
2126 );
2127 assert_eq!(agent.msg.messages.last().unwrap().role, Role::Assistant);
2128 }
2129
2130 #[tokio::test]
2131 async fn load_history_preserves_complete_tool_pairs() {
2132 use zeph_llm::provider::MessagePart;
2133
2134 let provider = mock_provider(vec![]);
2135 let channel = MockChannel::new(vec![]);
2136 let registry = create_test_registry();
2137 let executor = MockToolExecutor::no_tools();
2138
2139 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2140 let cid = memory.sqlite().create_conversation().await.unwrap();
2141 let sqlite = memory.sqlite();
2142
2143 let use_parts = serde_json::to_string(&[MessagePart::ToolUse {
2145 id: "call_ok".to_string(),
2146 name: "shell".to_string(),
2147 input: serde_json::json!({"command": "pwd"}),
2148 }])
2149 .unwrap();
2150 sqlite
2151 .save_message_with_parts(cid, "assistant", "[tool_use: shell(call_ok)]", &use_parts)
2152 .await
2153 .unwrap();
2154
2155 let result_parts = serde_json::to_string(&[MessagePart::ToolResult {
2156 tool_use_id: "call_ok".to_string(),
2157 content: "/home/user".to_string(),
2158 is_error: false,
2159 }])
2160 .unwrap();
2161 sqlite
2162 .save_message_with_parts(
2163 cid,
2164 "user",
2165 "[tool_result: call_ok]\n/home/user",
2166 &result_parts,
2167 )
2168 .await
2169 .unwrap();
2170
2171 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2172 std::sync::Arc::new(memory),
2173 cid,
2174 50,
2175 5,
2176 100,
2177 );
2178
2179 let messages_before = agent.msg.messages.len();
2180 agent.load_history().await.unwrap();
2181
2182 assert_eq!(
2184 agent.msg.messages.len(),
2185 messages_before + 2,
2186 "complete tool_use/tool_result pair must be preserved"
2187 );
2188 assert_eq!(agent.msg.messages[messages_before].role, Role::Assistant);
2189 assert_eq!(agent.msg.messages[messages_before + 1].role, Role::User);
2190 }
2191
2192 #[tokio::test]
2193 async fn load_history_handles_multiple_trailing_orphans() {
2194 use zeph_llm::provider::MessagePart;
2195
2196 let provider = mock_provider(vec![]);
2197 let channel = MockChannel::new(vec![]);
2198 let registry = create_test_registry();
2199 let executor = MockToolExecutor::no_tools();
2200
2201 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2202 let cid = memory.sqlite().create_conversation().await.unwrap();
2203 let sqlite = memory.sqlite();
2204
2205 sqlite.save_message(cid, "user", "start").await.unwrap();
2207
2208 let parts1 = serde_json::to_string(&[MessagePart::ToolUse {
2210 id: "call_1".to_string(),
2211 name: "shell".to_string(),
2212 input: serde_json::json!({}),
2213 }])
2214 .unwrap();
2215 sqlite
2216 .save_message_with_parts(cid, "assistant", "[tool_use: shell(call_1)]", &parts1)
2217 .await
2218 .unwrap();
2219
2220 let parts2 = serde_json::to_string(&[MessagePart::ToolUse {
2222 id: "call_2".to_string(),
2223 name: "read_file".to_string(),
2224 input: serde_json::json!({}),
2225 }])
2226 .unwrap();
2227 sqlite
2228 .save_message_with_parts(cid, "assistant", "[tool_use: read_file(call_2)]", &parts2)
2229 .await
2230 .unwrap();
2231
2232 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2233 std::sync::Arc::new(memory),
2234 cid,
2235 50,
2236 5,
2237 100,
2238 );
2239
2240 let messages_before = agent.msg.messages.len();
2241 agent.load_history().await.unwrap();
2242
2243 assert_eq!(
2245 agent.msg.messages.len(),
2246 messages_before + 1,
2247 "all trailing orphaned tool_use messages must be removed"
2248 );
2249 assert_eq!(agent.msg.messages.last().unwrap().role, Role::User);
2250 }
2251
2252 #[tokio::test]
2253 async fn load_history_no_tool_messages_unchanged() {
2254 let provider = mock_provider(vec![]);
2255 let channel = MockChannel::new(vec![]);
2256 let registry = create_test_registry();
2257 let executor = MockToolExecutor::no_tools();
2258
2259 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2260 let cid = memory.sqlite().create_conversation().await.unwrap();
2261 let sqlite = memory.sqlite();
2262
2263 sqlite.save_message(cid, "user", "hello").await.unwrap();
2264 sqlite
2265 .save_message(cid, "assistant", "hi there")
2266 .await
2267 .unwrap();
2268 sqlite
2269 .save_message(cid, "user", "how are you?")
2270 .await
2271 .unwrap();
2272
2273 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2274 std::sync::Arc::new(memory),
2275 cid,
2276 50,
2277 5,
2278 100,
2279 );
2280
2281 let messages_before = agent.msg.messages.len();
2282 agent.load_history().await.unwrap();
2283
2284 assert_eq!(
2286 agent.msg.messages.len(),
2287 messages_before + 3,
2288 "plain messages without tool parts must pass through unchanged"
2289 );
2290 }
2291
2292 #[tokio::test]
2293 async fn load_history_removes_both_leading_and_trailing_orphans() {
2294 use zeph_llm::provider::MessagePart;
2295
2296 let provider = mock_provider(vec![]);
2297 let channel = MockChannel::new(vec![]);
2298 let registry = create_test_registry();
2299 let executor = MockToolExecutor::no_tools();
2300
2301 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2302 let cid = memory.sqlite().create_conversation().await.unwrap();
2303 let sqlite = memory.sqlite();
2304
2305 let result_parts = serde_json::to_string(&[MessagePart::ToolResult {
2307 tool_use_id: "call_leading".to_string(),
2308 content: "orphaned result".to_string(),
2309 is_error: false,
2310 }])
2311 .unwrap();
2312 sqlite
2313 .save_message_with_parts(
2314 cid,
2315 "user",
2316 "[tool_result: call_leading]\norphaned result",
2317 &result_parts,
2318 )
2319 .await
2320 .unwrap();
2321
2322 sqlite
2324 .save_message(cid, "user", "what is 2+2?")
2325 .await
2326 .unwrap();
2327 sqlite.save_message(cid, "assistant", "4").await.unwrap();
2328
2329 let use_parts = serde_json::to_string(&[MessagePart::ToolUse {
2331 id: "call_trailing".to_string(),
2332 name: "shell".to_string(),
2333 input: serde_json::json!({"command": "date"}),
2334 }])
2335 .unwrap();
2336 sqlite
2337 .save_message_with_parts(
2338 cid,
2339 "assistant",
2340 "[tool_use: shell(call_trailing)]",
2341 &use_parts,
2342 )
2343 .await
2344 .unwrap();
2345
2346 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2347 std::sync::Arc::new(memory),
2348 cid,
2349 50,
2350 5,
2351 100,
2352 );
2353
2354 let messages_before = agent.msg.messages.len();
2355 agent.load_history().await.unwrap();
2356
2357 assert_eq!(
2359 agent.msg.messages.len(),
2360 messages_before + 2,
2361 "both leading and trailing orphans must be removed"
2362 );
2363 assert_eq!(agent.msg.messages[messages_before].role, Role::User);
2364 assert_eq!(agent.msg.messages[messages_before].content, "what is 2+2?");
2365 assert_eq!(
2366 agent.msg.messages[messages_before + 1].role,
2367 Role::Assistant
2368 );
2369 assert_eq!(agent.msg.messages[messages_before + 1].content, "4");
2370 }
2371
2372 #[tokio::test]
2377 async fn sanitize_tool_pairs_strips_mid_history_orphan_tool_use() {
2378 use zeph_llm::provider::MessagePart;
2379
2380 let provider = mock_provider(vec![]);
2381 let channel = MockChannel::new(vec![]);
2382 let registry = create_test_registry();
2383 let executor = MockToolExecutor::no_tools();
2384
2385 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2386 let cid = memory.sqlite().create_conversation().await.unwrap();
2387 let sqlite = memory.sqlite();
2388
2389 sqlite
2391 .save_message(cid, "user", "first question")
2392 .await
2393 .unwrap();
2394 sqlite
2395 .save_message(cid, "assistant", "first answer")
2396 .await
2397 .unwrap();
2398
2399 let use_parts = serde_json::to_string(&[
2403 MessagePart::ToolUse {
2404 id: "call_mid_1".to_string(),
2405 name: "shell".to_string(),
2406 input: serde_json::json!({"command": "ls"}),
2407 },
2408 MessagePart::Text {
2409 text: "Let me check the files.".to_string(),
2410 },
2411 ])
2412 .unwrap();
2413 sqlite
2414 .save_message_with_parts(cid, "assistant", "Let me check the files.", &use_parts)
2415 .await
2416 .unwrap();
2417
2418 sqlite
2420 .save_message(cid, "user", "second question")
2421 .await
2422 .unwrap();
2423 sqlite
2424 .save_message(cid, "assistant", "second answer")
2425 .await
2426 .unwrap();
2427
2428 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2429 std::sync::Arc::new(memory),
2430 cid,
2431 50,
2432 5,
2433 100,
2434 );
2435
2436 let messages_before = agent.msg.messages.len();
2437 agent.load_history().await.unwrap();
2438
2439 assert_eq!(
2442 agent.msg.messages.len(),
2443 messages_before + 5,
2444 "message count must be 5 (orphan message kept — has text content)"
2445 );
2446
2447 let orphan = &agent.msg.messages[messages_before + 2];
2449 assert_eq!(orphan.role, Role::Assistant);
2450 assert!(
2451 !orphan
2452 .parts
2453 .iter()
2454 .any(|p| matches!(p, MessagePart::ToolUse { .. })),
2455 "orphaned ToolUse parts must be stripped from mid-history message"
2456 );
2457 assert!(
2459 orphan.parts.iter().any(
2460 |p| matches!(p, MessagePart::Text { text } if text == "Let me check the files.")
2461 ),
2462 "text content of orphaned assistant message must be preserved"
2463 );
2464 }
2465
2466 #[tokio::test]
2471 async fn load_history_keeps_tool_only_user_message() {
2472 use zeph_llm::provider::MessagePart;
2473
2474 let provider = mock_provider(vec![]);
2475 let channel = MockChannel::new(vec![]);
2476 let registry = create_test_registry();
2477 let executor = MockToolExecutor::no_tools();
2478
2479 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2480 let cid = memory.sqlite().create_conversation().await.unwrap();
2481 let sqlite = memory.sqlite();
2482
2483 let use_parts = serde_json::to_string(&[MessagePart::ToolUse {
2485 id: "call_rc3".to_string(),
2486 name: "memory_save".to_string(),
2487 input: serde_json::json!({"content": "something"}),
2488 }])
2489 .unwrap();
2490 sqlite
2491 .save_message_with_parts(cid, "assistant", "[tool_use: memory_save]", &use_parts)
2492 .await
2493 .unwrap();
2494
2495 let result_parts = serde_json::to_string(&[MessagePart::ToolResult {
2497 tool_use_id: "call_rc3".to_string(),
2498 content: "saved".to_string(),
2499 is_error: false,
2500 }])
2501 .unwrap();
2502 sqlite
2503 .save_message_with_parts(cid, "user", "", &result_parts)
2504 .await
2505 .unwrap();
2506
2507 sqlite.save_message(cid, "assistant", "done").await.unwrap();
2508
2509 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2510 std::sync::Arc::new(memory),
2511 cid,
2512 50,
2513 5,
2514 100,
2515 );
2516
2517 let messages_before = agent.msg.messages.len();
2518 agent.load_history().await.unwrap();
2519
2520 assert_eq!(
2523 agent.msg.messages.len(),
2524 messages_before + 3,
2525 "user message with empty content but ToolResult parts must not be dropped"
2526 );
2527
2528 let user_msg = &agent.msg.messages[messages_before + 1];
2530 assert_eq!(user_msg.role, Role::User);
2531 assert!(
2532 user_msg.parts.iter().any(
2533 |p| matches!(p, MessagePart::ToolResult { tool_use_id, .. } if tool_use_id == "call_rc3")
2534 ),
2535 "ToolResult part must be preserved on user message with empty content"
2536 );
2537 }
2538
2539 #[tokio::test]
2543 async fn strip_orphans_removes_orphaned_tool_result() {
2544 use zeph_llm::provider::MessagePart;
2545
2546 let provider = mock_provider(vec![]);
2547 let channel = MockChannel::new(vec![]);
2548 let registry = create_test_registry();
2549 let executor = MockToolExecutor::no_tools();
2550
2551 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2552 let cid = memory.sqlite().create_conversation().await.unwrap();
2553 let sqlite = memory.sqlite();
2554
2555 sqlite.save_message(cid, "user", "hello").await.unwrap();
2557 sqlite.save_message(cid, "assistant", "hi").await.unwrap();
2558
2559 sqlite
2561 .save_message(cid, "assistant", "plain answer")
2562 .await
2563 .unwrap();
2564
2565 let orphan_result_parts = serde_json::to_string(&[MessagePart::ToolResult {
2567 tool_use_id: "call_nonexistent".to_string(),
2568 content: "stale result".to_string(),
2569 is_error: false,
2570 }])
2571 .unwrap();
2572 sqlite
2573 .save_message_with_parts(
2574 cid,
2575 "user",
2576 "[tool_result: call_nonexistent]\nstale result",
2577 &orphan_result_parts,
2578 )
2579 .await
2580 .unwrap();
2581
2582 sqlite
2583 .save_message(cid, "assistant", "final")
2584 .await
2585 .unwrap();
2586
2587 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2588 std::sync::Arc::new(memory),
2589 cid,
2590 50,
2591 5,
2592 100,
2593 );
2594
2595 let messages_before = agent.msg.messages.len();
2596 agent.load_history().await.unwrap();
2597
2598 let loaded = &agent.msg.messages[messages_before..];
2602 for msg in loaded {
2603 assert!(
2604 !msg.parts.iter().any(|p| matches!(
2605 p,
2606 MessagePart::ToolResult { tool_use_id, .. } if tool_use_id == "call_nonexistent"
2607 )),
2608 "orphaned ToolResult part must be stripped from history"
2609 );
2610 }
2611 }
2612
2613 #[tokio::test]
2616 async fn strip_orphans_keeps_complete_pair() {
2617 use zeph_llm::provider::MessagePart;
2618
2619 let provider = mock_provider(vec![]);
2620 let channel = MockChannel::new(vec![]);
2621 let registry = create_test_registry();
2622 let executor = MockToolExecutor::no_tools();
2623
2624 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2625 let cid = memory.sqlite().create_conversation().await.unwrap();
2626 let sqlite = memory.sqlite();
2627
2628 let use_parts = serde_json::to_string(&[MessagePart::ToolUse {
2629 id: "call_valid".to_string(),
2630 name: "shell".to_string(),
2631 input: serde_json::json!({"command": "ls"}),
2632 }])
2633 .unwrap();
2634 sqlite
2635 .save_message_with_parts(cid, "assistant", "[tool_use: shell]", &use_parts)
2636 .await
2637 .unwrap();
2638
2639 let result_parts = serde_json::to_string(&[MessagePart::ToolResult {
2640 tool_use_id: "call_valid".to_string(),
2641 content: "file.rs".to_string(),
2642 is_error: false,
2643 }])
2644 .unwrap();
2645 sqlite
2646 .save_message_with_parts(cid, "user", "", &result_parts)
2647 .await
2648 .unwrap();
2649
2650 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2651 std::sync::Arc::new(memory),
2652 cid,
2653 50,
2654 5,
2655 100,
2656 );
2657
2658 let messages_before = agent.msg.messages.len();
2659 agent.load_history().await.unwrap();
2660
2661 assert_eq!(
2662 agent.msg.messages.len(),
2663 messages_before + 2,
2664 "complete tool_use/tool_result pair must be preserved"
2665 );
2666
2667 let user_msg = &agent.msg.messages[messages_before + 1];
2668 assert!(
2669 user_msg.parts.iter().any(|p| matches!(
2670 p,
2671 MessagePart::ToolResult { tool_use_id, .. } if tool_use_id == "call_valid"
2672 )),
2673 "ToolResult part for a matched tool_use must not be stripped"
2674 );
2675 }
2676
2677 #[tokio::test]
2680 async fn strip_orphans_mixed_history() {
2681 use zeph_llm::provider::MessagePart;
2682
2683 let provider = mock_provider(vec![]);
2684 let channel = MockChannel::new(vec![]);
2685 let registry = create_test_registry();
2686 let executor = MockToolExecutor::no_tools();
2687
2688 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2689 let cid = memory.sqlite().create_conversation().await.unwrap();
2690 let sqlite = memory.sqlite();
2691
2692 let use_parts_ok = serde_json::to_string(&[MessagePart::ToolUse {
2694 id: "call_good".to_string(),
2695 name: "shell".to_string(),
2696 input: serde_json::json!({"command": "pwd"}),
2697 }])
2698 .unwrap();
2699 sqlite
2700 .save_message_with_parts(cid, "assistant", "[tool_use: shell]", &use_parts_ok)
2701 .await
2702 .unwrap();
2703
2704 let result_parts_ok = serde_json::to_string(&[MessagePart::ToolResult {
2705 tool_use_id: "call_good".to_string(),
2706 content: "/home".to_string(),
2707 is_error: false,
2708 }])
2709 .unwrap();
2710 sqlite
2711 .save_message_with_parts(cid, "user", "", &result_parts_ok)
2712 .await
2713 .unwrap();
2714
2715 sqlite
2717 .save_message(cid, "assistant", "text only")
2718 .await
2719 .unwrap();
2720
2721 let orphan_parts = serde_json::to_string(&[MessagePart::ToolResult {
2722 tool_use_id: "call_ghost".to_string(),
2723 content: "ghost result".to_string(),
2724 is_error: false,
2725 }])
2726 .unwrap();
2727 sqlite
2728 .save_message_with_parts(
2729 cid,
2730 "user",
2731 "[tool_result: call_ghost]\nghost result",
2732 &orphan_parts,
2733 )
2734 .await
2735 .unwrap();
2736
2737 sqlite
2738 .save_message(cid, "assistant", "final reply")
2739 .await
2740 .unwrap();
2741
2742 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2743 std::sync::Arc::new(memory),
2744 cid,
2745 50,
2746 5,
2747 100,
2748 );
2749
2750 let messages_before = agent.msg.messages.len();
2751 agent.load_history().await.unwrap();
2752
2753 let loaded = &agent.msg.messages[messages_before..];
2754
2755 for msg in loaded {
2757 assert!(
2758 !msg.parts.iter().any(|p| matches!(
2759 p,
2760 MessagePart::ToolResult { tool_use_id, .. } if tool_use_id == "call_ghost"
2761 )),
2762 "orphaned ToolResult (call_ghost) must be stripped from history"
2763 );
2764 }
2765
2766 let has_good_result = loaded.iter().any(|msg| {
2769 msg.role == Role::User
2770 && msg.parts.iter().any(|p| {
2771 matches!(
2772 p,
2773 MessagePart::ToolResult { tool_use_id, .. } if tool_use_id == "call_good"
2774 )
2775 })
2776 });
2777 assert!(
2778 has_good_result,
2779 "matched ToolResult (call_good) must be preserved in history"
2780 );
2781 }
2782
2783 #[tokio::test]
2786 async fn sanitize_tool_pairs_preserves_matched_tool_pair() {
2787 use zeph_llm::provider::MessagePart;
2788
2789 let provider = mock_provider(vec![]);
2790 let channel = MockChannel::new(vec![]);
2791 let registry = create_test_registry();
2792 let executor = MockToolExecutor::no_tools();
2793
2794 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2795 let cid = memory.sqlite().create_conversation().await.unwrap();
2796 let sqlite = memory.sqlite();
2797
2798 sqlite
2799 .save_message(cid, "user", "run a command")
2800 .await
2801 .unwrap();
2802
2803 let use_parts = serde_json::to_string(&[MessagePart::ToolUse {
2805 id: "call_ok".to_string(),
2806 name: "shell".to_string(),
2807 input: serde_json::json!({"command": "echo hi"}),
2808 }])
2809 .unwrap();
2810 sqlite
2811 .save_message_with_parts(cid, "assistant", "[tool_use: shell]", &use_parts)
2812 .await
2813 .unwrap();
2814
2815 let result_parts = serde_json::to_string(&[MessagePart::ToolResult {
2817 tool_use_id: "call_ok".to_string(),
2818 content: "hi".to_string(),
2819 is_error: false,
2820 }])
2821 .unwrap();
2822 sqlite
2823 .save_message_with_parts(cid, "user", "[tool_result: call_ok]\nhi", &result_parts)
2824 .await
2825 .unwrap();
2826
2827 sqlite.save_message(cid, "assistant", "done").await.unwrap();
2828
2829 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2830 std::sync::Arc::new(memory),
2831 cid,
2832 50,
2833 5,
2834 100,
2835 );
2836
2837 let messages_before = agent.msg.messages.len();
2838 agent.load_history().await.unwrap();
2839
2840 assert_eq!(
2842 agent.msg.messages.len(),
2843 messages_before + 4,
2844 "matched tool pair must not be removed"
2845 );
2846 let tool_msg = &agent.msg.messages[messages_before + 1];
2847 assert!(
2848 tool_msg
2849 .parts
2850 .iter()
2851 .any(|p| matches!(p, MessagePart::ToolUse { id, .. } if id == "call_ok")),
2852 "matched ToolUse parts must be preserved"
2853 );
2854 }
2855
2856 #[tokio::test]
2860 async fn persist_cancelled_tool_results_pairs_tool_use() {
2861 use zeph_llm::provider::MessagePart;
2862
2863 let provider = mock_provider(vec![]);
2864 let channel = MockChannel::new(vec![]);
2865 let registry = create_test_registry();
2866 let executor = MockToolExecutor::no_tools();
2867
2868 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2869 let cid = memory.sqlite().create_conversation().await.unwrap();
2870
2871 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2872 std::sync::Arc::new(memory),
2873 cid,
2874 50,
2875 5,
2876 100,
2877 );
2878
2879 let tool_calls = vec![
2881 zeph_llm::provider::ToolUseRequest {
2882 id: "cancel_id_1".to_string(),
2883 name: "shell".to_string().into(),
2884 input: serde_json::json!({}),
2885 },
2886 zeph_llm::provider::ToolUseRequest {
2887 id: "cancel_id_2".to_string(),
2888 name: "read_file".to_string().into(),
2889 input: serde_json::json!({}),
2890 },
2891 ];
2892
2893 agent.persist_cancelled_tool_results(&tool_calls).await;
2894
2895 let history = agent
2896 .services
2897 .memory
2898 .persistence
2899 .memory
2900 .as_ref()
2901 .unwrap()
2902 .sqlite()
2903 .load_history(cid, 50)
2904 .await
2905 .unwrap();
2906
2907 assert_eq!(history.len(), 1);
2909 assert_eq!(history[0].role, Role::User);
2910
2911 for tc in &tool_calls {
2913 assert!(
2914 history[0].parts.iter().any(|p| matches!(
2915 p,
2916 MessagePart::ToolResult { tool_use_id, is_error, .. }
2917 if tool_use_id == &tc.id && *is_error
2918 )),
2919 "tombstone ToolResult for {} must be present and is_error=true",
2920 tc.id
2921 );
2922 }
2923 }
2924
2925 #[tokio::test]
2936 async fn issue_2529_orphaned_legacy_content_pair_is_soft_deleted() {
2937 use zeph_llm::provider::MessagePart;
2938
2939 let provider = mock_provider(vec![]);
2940 let channel = MockChannel::new(vec![]);
2941 let registry = create_test_registry();
2942 let executor = MockToolExecutor::no_tools();
2943
2944 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2945 let cid = memory.sqlite().create_conversation().await.unwrap();
2946 let sqlite = memory.sqlite();
2947
2948 sqlite
2950 .save_message(cid, "user", "save this for me")
2951 .await
2952 .unwrap();
2953
2954 let use_parts = serde_json::to_string(&[MessagePart::ToolUse {
2957 id: "call_2529".to_string(),
2958 name: "memory_save".to_string(),
2959 input: serde_json::json!({"content": "save this"}),
2960 }])
2961 .unwrap();
2962 let orphan_assistant_id = sqlite
2963 .save_message_with_parts(
2964 cid,
2965 "assistant",
2966 "[tool_use: memory_save(call_2529)]",
2967 &use_parts,
2968 )
2969 .await
2970 .unwrap();
2971
2972 sqlite
2977 .save_message(cid, "assistant", "here is a plain reply")
2978 .await
2979 .unwrap();
2980
2981 let result_parts = serde_json::to_string(&[MessagePart::ToolResult {
2982 tool_use_id: "call_2529".to_string(),
2983 content: "saved".to_string(),
2984 is_error: false,
2985 }])
2986 .unwrap();
2987 let orphan_user_id = sqlite
2988 .save_message_with_parts(
2989 cid,
2990 "user",
2991 "[tool_result: call_2529]\nsaved",
2992 &result_parts,
2993 )
2994 .await
2995 .unwrap();
2996
2997 sqlite.save_message(cid, "assistant", "done").await.unwrap();
2999
3000 let memory_arc = std::sync::Arc::new(memory);
3001 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
3002 memory_arc.clone(),
3003 cid,
3004 50,
3005 5,
3006 100,
3007 );
3008
3009 agent.load_history().await.unwrap();
3010
3011 let assistant_deleted_count: Vec<i64> = zeph_db::query_scalar(
3014 "SELECT COUNT(*) FROM messages WHERE id = ? AND deleted_at IS NOT NULL",
3015 )
3016 .bind(orphan_assistant_id)
3017 .fetch_all(memory_arc.sqlite().pool())
3018 .await
3019 .unwrap();
3020
3021 let user_deleted_count: Vec<i64> = zeph_db::query_scalar(
3022 "SELECT COUNT(*) FROM messages WHERE id = ? AND deleted_at IS NOT NULL",
3023 )
3024 .bind(orphan_user_id)
3025 .fetch_all(memory_arc.sqlite().pool())
3026 .await
3027 .unwrap();
3028
3029 assert_eq!(
3030 assistant_deleted_count.first().copied().unwrap_or(0),
3031 1,
3032 "orphaned assistant[ToolUse] with legacy-only content must be soft-deleted (deleted_at IS NOT NULL)"
3033 );
3034 assert_eq!(
3035 user_deleted_count.first().copied().unwrap_or(0),
3036 1,
3037 "orphaned user[ToolResult] with legacy-only content must be soft-deleted (deleted_at IS NOT NULL)"
3038 );
3039 }
3040
3041 #[tokio::test]
3045 async fn issue_2529_soft_delete_is_idempotent_across_sessions() {
3046 use zeph_llm::provider::MessagePart;
3047
3048 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
3049 let cid = memory.sqlite().create_conversation().await.unwrap();
3050 let sqlite = memory.sqlite();
3051
3052 sqlite
3054 .save_message(cid, "user", "do something")
3055 .await
3056 .unwrap();
3057
3058 let use_parts = serde_json::to_string(&[MessagePart::ToolUse {
3060 id: "call_idem".to_string(),
3061 name: "shell".to_string(),
3062 input: serde_json::json!({"command": "ls"}),
3063 }])
3064 .unwrap();
3065 sqlite
3066 .save_message_with_parts(cid, "assistant", "[tool_use: shell(call_idem)]", &use_parts)
3067 .await
3068 .unwrap();
3069
3070 sqlite
3072 .save_message(cid, "assistant", "continuing")
3073 .await
3074 .unwrap();
3075
3076 let result_parts = serde_json::to_string(&[MessagePart::ToolResult {
3078 tool_use_id: "call_idem".to_string(),
3079 content: "output".to_string(),
3080 is_error: false,
3081 }])
3082 .unwrap();
3083 sqlite
3084 .save_message_with_parts(
3085 cid,
3086 "user",
3087 "[tool_result: call_idem]\noutput",
3088 &result_parts,
3089 )
3090 .await
3091 .unwrap();
3092
3093 sqlite
3094 .save_message(cid, "assistant", "final")
3095 .await
3096 .unwrap();
3097
3098 let memory_arc = std::sync::Arc::new(memory);
3099
3100 let mut agent1 = Agent::new(
3102 mock_provider(vec![]),
3103 MockChannel::new(vec![]),
3104 create_test_registry(),
3105 None,
3106 5,
3107 MockToolExecutor::no_tools(),
3108 )
3109 .with_memory(memory_arc.clone(), cid, 50, 5, 100);
3110 agent1.load_history().await.unwrap();
3111 let count_after_first = agent1.msg.messages.len();
3112
3113 let mut agent2 = Agent::new(
3116 mock_provider(vec![]),
3117 MockChannel::new(vec![]),
3118 create_test_registry(),
3119 None,
3120 5,
3121 MockToolExecutor::no_tools(),
3122 )
3123 .with_memory(memory_arc.clone(), cid, 50, 5, 100);
3124 agent2.load_history().await.unwrap();
3125 let count_after_second = agent2.msg.messages.len();
3126
3127 assert_eq!(
3129 count_after_first, count_after_second,
3130 "second load_history must load the same message count as the first (soft-deleted orphans excluded)"
3131 );
3132 }
3133
3134 #[tokio::test]
3138 async fn issue_2529_message_with_text_and_tool_tag_is_kept_after_part_strip() {
3139 use zeph_llm::provider::MessagePart;
3140
3141 let provider = mock_provider(vec![]);
3142 let channel = MockChannel::new(vec![]);
3143 let registry = create_test_registry();
3144 let executor = MockToolExecutor::no_tools();
3145
3146 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
3147 let cid = memory.sqlite().create_conversation().await.unwrap();
3148 let sqlite = memory.sqlite();
3149
3150 sqlite
3152 .save_message(cid, "user", "check the files")
3153 .await
3154 .unwrap();
3155
3156 let use_parts = serde_json::to_string(&[MessagePart::ToolUse {
3159 id: "call_mixed".to_string(),
3160 name: "shell".to_string(),
3161 input: serde_json::json!({"command": "ls"}),
3162 }])
3163 .unwrap();
3164 sqlite
3165 .save_message_with_parts(
3166 cid,
3167 "assistant",
3168 "Let me list the directory. [tool_use: shell(call_mixed)]",
3169 &use_parts,
3170 )
3171 .await
3172 .unwrap();
3173
3174 sqlite.save_message(cid, "user", "thanks").await.unwrap();
3176 sqlite
3177 .save_message(cid, "assistant", "you are welcome")
3178 .await
3179 .unwrap();
3180
3181 let memory_arc = std::sync::Arc::new(memory);
3182 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
3183 memory_arc.clone(),
3184 cid,
3185 50,
3186 5,
3187 100,
3188 );
3189
3190 let messages_before = agent.msg.messages.len();
3191 agent.load_history().await.unwrap();
3192
3193 assert_eq!(
3195 agent.msg.messages.len(),
3196 messages_before + 4,
3197 "assistant message with text + tool tag must not be removed after ToolUse strip"
3198 );
3199
3200 let mixed_msg = agent
3202 .msg
3203 .messages
3204 .iter()
3205 .find(|m| m.content.contains("Let me list the directory"))
3206 .expect("mixed-content assistant message must still be in history");
3207 assert!(
3208 !mixed_msg
3209 .parts
3210 .iter()
3211 .any(|p| matches!(p, MessagePart::ToolUse { .. })),
3212 "orphaned ToolUse parts must be stripped even when message has meaningful text"
3213 );
3214 assert_eq!(
3215 mixed_msg.content, "Let me list the directory. [tool_use: shell(call_mixed)]",
3216 "content field must be unchanged — only parts are stripped"
3217 );
3218 }
3219
3220 #[tokio::test]
3223 async fn persist_message_skipped_tool_result_does_not_embed() {
3224 use zeph_llm::provider::MessagePart;
3225
3226 let provider = mock_provider(vec![]);
3227 let channel = MockChannel::new(vec![]);
3228 let registry = create_test_registry();
3229 let executor = MockToolExecutor::no_tools();
3230
3231 let (tx, rx) = tokio::sync::watch::channel(MetricsSnapshot::default());
3232 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
3233 let cid = memory.sqlite().create_conversation().await.unwrap();
3234
3235 let mut agent = Agent::new(provider, channel, registry, None, 5, executor)
3236 .with_metrics(tx)
3237 .with_memory(std::sync::Arc::new(memory), cid, 50, 5, 100);
3238 agent.services.memory.persistence.autosave_assistant = true;
3239 agent.services.memory.persistence.autosave_min_length = 0;
3240
3241 let parts = vec![MessagePart::ToolResult {
3242 tool_use_id: "tu1".into(),
3243 content: "[skipped] bash tool was blocked by utility gate".into(),
3244 is_error: false,
3245 }];
3246
3247 agent
3248 .persist_message(
3249 Role::User,
3250 "[skipped] bash tool was blocked by utility gate",
3251 &parts,
3252 false,
3253 )
3254 .await;
3255
3256 assert_eq!(
3257 rx.borrow().embeddings_generated,
3258 0,
3259 "[skipped] ToolResult must not be embedded into Qdrant"
3260 );
3261 }
3262
3263 #[tokio::test]
3264 async fn persist_message_stopped_tool_result_does_not_embed() {
3265 use zeph_llm::provider::MessagePart;
3266
3267 let provider = mock_provider(vec![]);
3268 let channel = MockChannel::new(vec![]);
3269 let registry = create_test_registry();
3270 let executor = MockToolExecutor::no_tools();
3271
3272 let (tx, rx) = tokio::sync::watch::channel(MetricsSnapshot::default());
3273 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
3274 let cid = memory.sqlite().create_conversation().await.unwrap();
3275
3276 let mut agent = Agent::new(provider, channel, registry, None, 5, executor)
3277 .with_metrics(tx)
3278 .with_memory(std::sync::Arc::new(memory), cid, 50, 5, 100);
3279 agent.services.memory.persistence.autosave_assistant = true;
3280 agent.services.memory.persistence.autosave_min_length = 0;
3281
3282 let parts = vec![MessagePart::ToolResult {
3283 tool_use_id: "tu2".into(),
3284 content: "[stopped] execution limit reached".into(),
3285 is_error: false,
3286 }];
3287
3288 agent
3289 .persist_message(
3290 Role::User,
3291 "[stopped] execution limit reached",
3292 &parts,
3293 false,
3294 )
3295 .await;
3296
3297 assert_eq!(
3298 rx.borrow().embeddings_generated,
3299 0,
3300 "[stopped] ToolResult must not be embedded into Qdrant"
3301 );
3302 }
3303
3304 #[tokio::test]
3305 async fn persist_message_normal_tool_result_is_saved_not_blocked_by_guard() {
3306 use zeph_llm::provider::MessagePart;
3309
3310 let provider = mock_provider(vec![]);
3311 let channel = MockChannel::new(vec![]);
3312 let registry = create_test_registry();
3313 let executor = MockToolExecutor::no_tools();
3314
3315 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
3316 let cid = memory.sqlite().create_conversation().await.unwrap();
3317 let memory_arc = std::sync::Arc::new(memory);
3318
3319 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
3320 memory_arc.clone(),
3321 cid,
3322 50,
3323 5,
3324 100,
3325 );
3326 agent.services.memory.persistence.autosave_assistant = true;
3327 agent.services.memory.persistence.autosave_min_length = 0;
3328
3329 let content = "total 42\ndrwxr-xr-x 5 user group";
3330 let parts = vec![MessagePart::ToolResult {
3331 tool_use_id: "tu3".into(),
3332 content: content.into(),
3333 is_error: false,
3334 }];
3335
3336 agent
3337 .persist_message(Role::User, content, &parts, false)
3338 .await;
3339
3340 let history = memory_arc.sqlite().load_history(cid, 50).await.unwrap();
3342 assert_eq!(
3343 history.len(),
3344 1,
3345 "normal ToolResult must be saved to SQLite"
3346 );
3347 assert_eq!(history[0].content, content);
3348 }
3349
3350 #[test]
3355 fn trajectory_extraction_slice_bounds_messages() {
3356 let max_messages: usize = 20;
3358 let total_messages = 100usize;
3359
3360 let tail_start = total_messages.saturating_sub(max_messages);
3361 let window = total_messages - tail_start;
3362
3363 assert_eq!(
3364 window, 20,
3365 "slice should contain exactly max_messages items"
3366 );
3367 assert_eq!(tail_start, 80, "slice should start at len - max_messages");
3368 }
3369
3370 #[test]
3371 fn trajectory_extraction_slice_handles_few_messages() {
3372 let max_messages: usize = 20;
3373 let total_messages = 5usize;
3374
3375 let tail_start = total_messages.saturating_sub(max_messages);
3376 let window = total_messages - tail_start;
3377
3378 assert_eq!(window, 5, "should return all messages when fewer than max");
3379 assert_eq!(tail_start, 0, "slice should start from the beginning");
3380 }
3381
3382 #[tokio::test]
3388 async fn regression_3168_complete_tool_pair_survives_round_trip() {
3389 use zeph_llm::provider::MessagePart;
3390
3391 let provider = mock_provider(vec![]);
3392 let channel = MockChannel::new(vec![]);
3393 let registry = create_test_registry();
3394 let executor = MockToolExecutor::no_tools();
3395
3396 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
3397 let cid = memory.sqlite().create_conversation().await.unwrap();
3398 let sqlite = memory.sqlite();
3399
3400 let use_parts = serde_json::to_string(&[MessagePart::ToolUse {
3401 id: "r3168_call".to_string(),
3402 name: "shell".to_string(),
3403 input: serde_json::json!({"command": "echo hi"}),
3404 }])
3405 .unwrap();
3406 sqlite
3407 .save_message_with_parts(
3408 cid,
3409 "assistant",
3410 "[tool_use: shell(r3168_call)]",
3411 &use_parts,
3412 )
3413 .await
3414 .unwrap();
3415
3416 let result_parts = serde_json::to_string(&[MessagePart::ToolResult {
3417 tool_use_id: "r3168_call".to_string(),
3418 content: "[skipped]".to_string(),
3419 is_error: false,
3420 }])
3421 .unwrap();
3422 sqlite
3423 .save_message_with_parts(cid, "user", "[tool_result: r3168_call]", &result_parts)
3424 .await
3425 .unwrap();
3426
3427 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
3428 std::sync::Arc::new(memory),
3429 cid,
3430 50,
3431 5,
3432 100,
3433 );
3434
3435 let base = agent.msg.messages.len();
3436 agent.load_history().await.unwrap();
3437
3438 assert_eq!(
3439 agent.msg.messages.len(),
3440 base + 2,
3441 "both messages of the complete pair must survive load_history"
3442 );
3443
3444 let assistant_msg = agent
3445 .msg
3446 .messages
3447 .iter()
3448 .find(|m| m.role == Role::Assistant)
3449 .expect("assistant message missing after load_history");
3450 assert!(
3451 assistant_msg
3452 .parts
3453 .iter()
3454 .any(|p| matches!(p, MessagePart::ToolUse { id, .. } if id == "r3168_call")),
3455 "ToolUse part must be preserved in assistant message"
3456 );
3457
3458 let user_msg = agent
3459 .msg
3460 .messages
3461 .iter()
3462 .rev()
3463 .find(|m| m.role == Role::User)
3464 .expect("user message missing after load_history");
3465 assert!(
3466 user_msg.parts.iter().any(|p| matches!(p, MessagePart::ToolResult { tool_use_id, .. } if tool_use_id == "r3168_call")),
3467 "ToolResult part must be preserved in user message"
3468 );
3469 }
3470
3471 #[tokio::test]
3476 async fn regression_3168_corrupt_parts_row_skipped_on_load() {
3477 use zeph_llm::provider::MessagePart;
3478
3479 let provider = mock_provider(vec![]);
3480 let channel = MockChannel::new(vec![]);
3481 let registry = create_test_registry();
3482 let executor = MockToolExecutor::no_tools();
3483
3484 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
3485 let cid = memory.sqlite().create_conversation().await.unwrap();
3486 let sqlite = memory.sqlite();
3487
3488 sqlite
3492 .save_message_with_parts(cid, "assistant", "[tool_use: shell(corrupt)]", "[]")
3493 .await
3494 .unwrap();
3495
3496 let result_parts = serde_json::to_string(&[MessagePart::ToolResult {
3498 tool_use_id: "corrupt".to_string(),
3499 content: "result".to_string(),
3500 is_error: false,
3501 }])
3502 .unwrap();
3503 sqlite
3504 .save_message_with_parts(cid, "user", "[tool_result: corrupt]", &result_parts)
3505 .await
3506 .unwrap();
3507
3508 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
3509 std::sync::Arc::new(memory),
3510 cid,
3511 50,
3512 5,
3513 100,
3514 );
3515
3516 let base = agent.msg.messages.len();
3517 agent.load_history().await.unwrap();
3518
3519 let loaded = agent.msg.messages.len() - base;
3524 let orphan_present = agent.msg.messages.iter().any(|m| {
3527 m.role == Role::User
3528 && m.parts.iter().any(|p| {
3529 matches!(p, MessagePart::ToolResult { tool_use_id, .. } if tool_use_id == "corrupt")
3530 })
3531 });
3532 assert!(
3533 !orphan_present,
3534 "orphaned ToolResult must not survive load_history; loaded={loaded}"
3535 );
3536 }
3537}