1use std::collections::HashSet;
5
6use crate::channel::Channel;
7use zeph_llm::provider::{LlmProvider as _, Message, MessagePart, Role};
8use zeph_memory::store::role_str;
9
10use super::Agent;
11
12fn sanitize_tool_pairs(messages: &mut Vec<Message>) -> (usize, Vec<i64>) {
28 let mut removed = 0;
29 let mut db_ids: Vec<i64> = Vec::new();
30
31 loop {
32 if let Some(last) = messages.last()
34 && last.role == Role::Assistant
35 && last
36 .parts
37 .iter()
38 .any(|p| matches!(p, MessagePart::ToolUse { .. }))
39 {
40 let ids: Vec<String> = last
41 .parts
42 .iter()
43 .filter_map(|p| {
44 if let MessagePart::ToolUse { id, .. } = p {
45 Some(id.clone())
46 } else {
47 None
48 }
49 })
50 .collect();
51 tracing::warn!(
52 tool_ids = ?ids,
53 "removing orphaned trailing tool_use message from restored history"
54 );
55 if let Some(db_id) = messages.last().and_then(|m| m.metadata.db_id) {
56 db_ids.push(db_id);
57 }
58 messages.pop();
59 removed += 1;
60 continue;
61 }
62
63 if let Some(first) = messages.first()
65 && first.role == Role::User
66 && first
67 .parts
68 .iter()
69 .any(|p| matches!(p, MessagePart::ToolResult { .. }))
70 {
71 let ids: Vec<String> = first
72 .parts
73 .iter()
74 .filter_map(|p| {
75 if let MessagePart::ToolResult { tool_use_id, .. } = p {
76 Some(tool_use_id.clone())
77 } else {
78 None
79 }
80 })
81 .collect();
82 tracing::warn!(
83 tool_use_ids = ?ids,
84 "removing orphaned leading tool_result message from restored history"
85 );
86 if let Some(db_id) = messages.first().and_then(|m| m.metadata.db_id) {
87 db_ids.push(db_id);
88 }
89 messages.remove(0);
90 removed += 1;
91 continue;
92 }
93
94 break;
95 }
96
97 let (mid_removed, mid_db_ids) = strip_mid_history_orphans(messages);
100 removed += mid_removed;
101 db_ids.extend(mid_db_ids);
102
103 (removed, db_ids)
104}
105
106fn orphaned_tool_use_ids(msg: &Message, next_msg: Option<&Message>) -> HashSet<String> {
108 let matched: HashSet<String> = next_msg
109 .filter(|n| n.role == Role::User)
110 .map(|n| {
111 msg.parts
112 .iter()
113 .filter_map(|p| if let MessagePart::ToolUse { id, .. } = p { Some(id.clone()) } else { None })
114 .filter(|uid| n.parts.iter().any(|np| matches!(np, MessagePart::ToolResult { tool_use_id, .. } if tool_use_id == uid)))
115 .collect()
116 })
117 .unwrap_or_default();
118 msg.parts
119 .iter()
120 .filter_map(|p| {
121 if let MessagePart::ToolUse { id, .. } = p
122 && !matched.contains(id)
123 {
124 Some(id.clone())
125 } else {
126 None
127 }
128 })
129 .collect()
130}
131
132fn orphaned_tool_result_ids(msg: &Message, prev_msg: Option<&Message>) -> HashSet<String> {
134 let avail: HashSet<&str> = prev_msg
135 .filter(|p| p.role == Role::Assistant)
136 .map(|p| {
137 p.parts
138 .iter()
139 .filter_map(|part| {
140 if let MessagePart::ToolUse { id, .. } = part {
141 Some(id.as_str())
142 } else {
143 None
144 }
145 })
146 .collect()
147 })
148 .unwrap_or_default();
149 msg.parts
150 .iter()
151 .filter_map(|p| {
152 if let MessagePart::ToolResult { tool_use_id, .. } = p
153 && !avail.contains(tool_use_id.as_str())
154 {
155 Some(tool_use_id.clone())
156 } else {
157 None
158 }
159 })
160 .collect()
161}
162
163fn has_meaningful_content(content: &str) -> bool {
184 const PREFIXES: [&str; 3] = ["[tool_use: ", "[tool_result: ", "[tool output: "];
185
186 let mut remaining = content.trim();
187
188 loop {
189 let next = PREFIXES
191 .iter()
192 .filter_map(|prefix| remaining.find(prefix).map(|pos| (pos, *prefix)))
193 .min_by_key(|(pos, _)| *pos);
194
195 let Some((start, prefix)) = next else {
196 break;
198 };
199
200 if !remaining[..start].trim().is_empty() {
202 return true;
203 }
204
205 let after_prefix = &remaining[start + prefix.len()..];
207 let Some(close) = after_prefix.find(']') else {
208 return true;
210 };
211
212 let tag_end = start + prefix.len() + close + 1;
214
215 if prefix == "[tool_result: " || prefix == "[tool output: " {
216 let body = remaining[tag_end..].trim_start_matches('\n');
219 let next_tag = PREFIXES
220 .iter()
221 .filter_map(|p| body.find(p))
222 .min()
223 .unwrap_or(body.len());
224 remaining = &body[next_tag..];
225 } else {
226 remaining = &remaining[tag_end..];
227 }
228 }
229
230 !remaining.trim().is_empty()
231}
232
233fn strip_mid_history_orphans(messages: &mut Vec<Message>) -> (usize, Vec<i64>) {
246 let mut removed = 0;
247 let mut db_ids: Vec<i64> = Vec::new();
248 let mut i = 0;
249 while i < messages.len() {
250 if messages[i].role == Role::Assistant
254 && messages[i]
255 .parts
256 .iter()
257 .any(|p| matches!(p, MessagePart::ToolUse { .. }))
258 {
259 let next_non_system = (i + 1..messages.len())
260 .find(|&j| messages[j].role != Role::System)
261 .and_then(|j| messages.get(j));
262 let orphaned_ids = orphaned_tool_use_ids(&messages[i], next_non_system);
263 if !orphaned_ids.is_empty() {
264 tracing::warn!(
265 tool_ids = ?orphaned_ids,
266 index = i,
267 "stripping orphaned mid-history tool_use parts from assistant message"
268 );
269 messages[i].parts.retain(
270 |p| !matches!(p, MessagePart::ToolUse { id, .. } if orphaned_ids.contains(id)),
271 );
272 let is_empty =
273 !has_meaningful_content(&messages[i].content) && messages[i].parts.is_empty();
274 if is_empty {
275 if let Some(db_id) = messages[i].metadata.db_id {
276 db_ids.push(db_id);
277 }
278 messages.remove(i);
279 removed += 1;
280 continue; }
282 }
283 }
284
285 if messages[i].role == Role::User
287 && messages[i]
288 .parts
289 .iter()
290 .any(|p| matches!(p, MessagePart::ToolResult { .. }))
291 {
292 let prev_non_system = (0..i)
293 .rev()
294 .find(|&j| messages[j].role != Role::System)
295 .and_then(|j| messages.get(j));
296 let orphaned_ids = orphaned_tool_result_ids(&messages[i], prev_non_system);
297 if !orphaned_ids.is_empty() {
298 tracing::warn!(
299 tool_use_ids = ?orphaned_ids,
300 index = i,
301 "stripping orphaned mid-history tool_result parts from user message"
302 );
303 messages[i].parts.retain(|p| {
304 !matches!(p, MessagePart::ToolResult { tool_use_id, .. } if orphaned_ids.contains(tool_use_id.as_str()))
305 });
306
307 let is_empty =
308 !has_meaningful_content(&messages[i].content) && messages[i].parts.is_empty();
309 if is_empty {
310 if let Some(db_id) = messages[i].metadata.db_id {
311 db_ids.push(db_id);
312 }
313 messages.remove(i);
314 removed += 1;
315 continue;
317 }
318 }
319 }
320
321 i += 1;
322 }
323 (removed, db_ids)
324}
325
326impl<C: Channel> Agent<C> {
327 pub async fn load_history(&mut self) -> Result<(), super::error::AgentError> {
333 let (Some(memory), Some(cid)) = (
334 &self.memory_state.persistence.memory,
335 self.memory_state.persistence.conversation_id,
336 ) else {
337 return Ok(());
338 };
339
340 let history = memory
341 .sqlite()
342 .load_history_filtered(
343 cid,
344 self.memory_state.persistence.history_limit,
345 Some(true),
346 None,
347 )
348 .await?;
349 if !history.is_empty() {
350 let mut loaded = 0;
351 let mut skipped = 0;
352
353 for msg in history {
354 if !has_meaningful_content(&msg.content) && msg.parts.is_empty() {
359 tracing::warn!("skipping empty message from history (role: {:?})", msg.role);
360 skipped += 1;
361 continue;
362 }
363 self.msg.messages.push(msg);
364 loaded += 1;
365 }
366
367 let history_start = self.msg.messages.len() - loaded;
369 let mut restored_slice = self.msg.messages.split_off(history_start);
370 let (orphans, orphan_db_ids) = sanitize_tool_pairs(&mut restored_slice);
371 skipped += orphans;
372 loaded = loaded.saturating_sub(orphans);
373 self.msg.messages.append(&mut restored_slice);
374
375 if !orphan_db_ids.is_empty() {
376 let ids: Vec<zeph_memory::types::MessageId> = orphan_db_ids
377 .iter()
378 .map(|&id| zeph_memory::types::MessageId(id))
379 .collect();
380 if let Err(e) = memory.sqlite().soft_delete_messages(&ids).await {
381 tracing::warn!(
382 count = ids.len(),
383 error = %e,
384 "failed to soft-delete orphaned tool-pair messages from DB"
385 );
386 } else {
387 tracing::debug!(
388 count = ids.len(),
389 "soft-deleted orphaned tool-pair messages from DB"
390 );
391 }
392 }
393
394 tracing::info!("restored {loaded} message(s) from conversation {cid}");
395 if skipped > 0 {
396 tracing::warn!("skipped {skipped} empty/orphaned message(s) from history");
397 }
398
399 if loaded > 0 {
400 let _ = memory
403 .sqlite()
404 .increment_session_counts_for_conversation(cid)
405 .await
406 .inspect_err(|e| {
407 tracing::warn!(error = %e, "failed to increment tier session counts");
408 });
409 }
410 }
411
412 if let Ok(count) = memory.message_count(cid).await {
413 let count_u64 = u64::try_from(count).unwrap_or(0);
414 self.update_metrics(|m| {
415 m.sqlite_message_count = count_u64;
416 });
417 }
418
419 if let Ok(count) = memory.sqlite().count_semantic_facts().await {
420 let count_u64 = u64::try_from(count).unwrap_or(0);
421 self.update_metrics(|m| {
422 m.semantic_fact_count = count_u64;
423 });
424 }
425
426 if let Ok(count) = memory.unsummarized_message_count(cid).await {
427 self.memory_state.persistence.unsummarized_count = usize::try_from(count).unwrap_or(0);
428 }
429
430 self.recompute_prompt_tokens();
431 Ok(())
432 }
433
434 #[allow(clippy::too_many_lines)]
440 #[cfg_attr(
441 feature = "profiling",
442 tracing::instrument(name = "agent.persist_message", skip_all)
443 )]
444 pub(crate) async fn persist_message(
445 &mut self,
446 role: Role,
447 content: &str,
448 parts: &[MessagePart],
449 has_injection_flags: bool,
450 ) {
451 let (Some(memory), Some(cid)) = (
452 &self.memory_state.persistence.memory,
453 self.memory_state.persistence.conversation_id,
454 ) else {
455 return;
456 };
457
458 let parts_json = if parts.is_empty() {
459 "[]".to_string()
460 } else {
461 match serde_json::to_string(parts) {
462 Ok(json) => json,
463 Err(e) => {
464 tracing::error!(
465 role = ?role,
466 parts_count = parts.len(),
467 error = %e,
468 "failed to serialize message parts — skipping persist to avoid orphaned tool pair"
469 );
470 return;
471 }
472 }
473 };
474
475 let guard_event = self
479 .security
480 .exfiltration_guard
481 .should_guard_memory_write(has_injection_flags);
482 if let Some(ref event) = guard_event {
483 tracing::warn!(
484 ?event,
485 "exfiltration guard: skipping Qdrant embedding for flagged content"
486 );
487 self.update_metrics(|m| m.exfiltration_memory_guards += 1);
488 self.push_security_event(
489 crate::metrics::SecurityEventCategory::ExfiltrationBlock,
490 "memory_write",
491 "Qdrant embedding skipped: flagged content",
492 );
493 }
494
495 let skip_embedding = guard_event.is_some();
496
497 let has_skipped_tool_result = parts.iter().any(|p| {
501 if let MessagePart::ToolResult { content, .. } = p {
502 content.starts_with("[skipped]") || content.starts_with("[stopped]")
503 } else {
504 false
505 }
506 });
507
508 let should_embed = if skip_embedding || has_skipped_tool_result {
509 false
510 } else {
511 match role {
512 Role::Assistant => {
513 self.memory_state.persistence.autosave_assistant
514 && content.len() >= self.memory_state.persistence.autosave_min_length
515 }
516 _ => true,
517 }
518 };
519
520 let goal_text = self.memory_state.extraction.goal_text.clone();
521
522 tracing::debug!(
523 "persist_message: calling remember_with_parts, embed dispatched to background"
524 );
525 let (embedding_stored, was_persisted) = if should_embed {
526 match memory
527 .remember_with_parts(
528 cid,
529 role_str(role),
530 content,
531 &parts_json,
532 goal_text.as_deref(),
533 )
534 .await
535 {
536 Ok((Some(message_id), stored)) => {
537 self.msg.last_persisted_message_id = Some(message_id.0);
538 (stored, true)
539 }
540 Ok((None, _)) => {
541 return;
543 }
544 Err(e) => {
545 tracing::error!("failed to persist message: {e:#}");
546 return;
547 }
548 }
549 } else {
550 match memory
551 .save_only(cid, role_str(role), content, &parts_json)
552 .await
553 {
554 Ok(message_id) => {
555 self.msg.last_persisted_message_id = Some(message_id.0);
556 (false, true)
557 }
558 Err(e) => {
559 tracing::error!("failed to persist message: {e:#}");
560 return;
561 }
562 }
563 };
564
565 if !was_persisted {
566 return;
567 }
568
569 self.memory_state.persistence.unsummarized_count += 1;
570
571 self.update_metrics(|m| {
572 m.sqlite_message_count += 1;
573 if embedding_stored {
574 m.embeddings_generated += 1;
575 }
576 });
577
578 tracing::debug!("persist_message: db insert complete, embedding running in background");
579 memory.reap_embed_tasks();
580
581 self.enqueue_summarization_task();
585
586 let has_tool_result_parts = parts
589 .iter()
590 .any(|p| matches!(p, MessagePart::ToolResult { .. }));
591
592 self.enqueue_graph_extraction_task(content, has_injection_flags, has_tool_result_parts)
593 .await;
594
595 if role == Role::User && !has_tool_result_parts && !has_injection_flags {
597 self.enqueue_persona_extraction_task();
598 }
599
600 if has_tool_result_parts {
602 self.enqueue_trajectory_extraction_task();
603 }
604
605 let has_tool_use_parts = parts
610 .iter()
611 .any(|p| matches!(p, MessagePart::ToolUse { .. }));
612 if role == Role::Assistant && !has_tool_use_parts && !has_injection_flags {
613 self.enqueue_reasoning_extraction_task();
614 }
615 }
616
617 fn enqueue_summarization_task(&mut self) {
619 let (Some(memory), Some(cid)) = (
620 self.memory_state.persistence.memory.clone(),
621 self.memory_state.persistence.conversation_id,
622 ) else {
623 return;
624 };
625
626 if self.memory_state.persistence.unsummarized_count
627 <= self.memory_state.compaction.summarization_threshold
628 {
629 return;
630 }
631
632 let batch_size = self.memory_state.compaction.summarization_threshold / 2;
633
634 self.lifecycle.supervisor.spawn_summarization("summarization", async move {
635 match tokio::time::timeout(
636 std::time::Duration::from_secs(30),
637 memory.summarize(cid, batch_size),
638 )
639 .await
640 {
641 Ok(Ok(Some(summary_id))) => {
642 tracing::info!(
643 "background summarization: created summary {summary_id} for conversation {cid}"
644 );
645 true
646 }
647 Ok(Ok(None)) => {
648 tracing::debug!("background summarization: no summarization needed");
649 false
650 }
651 Ok(Err(e)) => {
652 tracing::error!("background summarization failed: {e:#}");
653 false
654 }
655 Err(_) => {
656 tracing::warn!("background summarization timed out after 30s");
657 false
658 }
659 }
660 });
661 }
662
663 #[allow(clippy::too_many_lines)]
668 async fn enqueue_graph_extraction_task(
669 &mut self,
670 content: &str,
671 has_injection_flags: bool,
672 has_tool_result_parts: bool,
673 ) {
674 use zeph_memory::semantic::GraphExtractionConfig;
675
676 if self.memory_state.persistence.memory.is_none()
677 || self.memory_state.persistence.conversation_id.is_none()
678 {
679 return;
680 }
681 if has_tool_result_parts {
682 tracing::debug!("graph extraction skipped: message contains ToolResult parts");
683 return;
684 }
685 if has_injection_flags {
686 tracing::warn!("graph extraction skipped: injection patterns detected in content");
687 return;
688 }
689
690 let extraction_cfg = {
691 let cfg = &self.memory_state.extraction.graph_config;
692 if !cfg.enabled {
693 return;
694 }
695 GraphExtractionConfig {
696 max_entities: cfg.max_entities_per_message,
697 max_edges: cfg.max_edges_per_message,
698 extraction_timeout_secs: cfg.extraction_timeout_secs,
699 community_refresh_interval: cfg.community_refresh_interval,
700 expired_edge_retention_days: cfg.expired_edge_retention_days,
701 max_entities_cap: cfg.max_entities,
702 community_summary_max_prompt_bytes: cfg.community_summary_max_prompt_bytes,
703 community_summary_concurrency: cfg.community_summary_concurrency,
704 lpa_edge_chunk_size: cfg.lpa_edge_chunk_size,
705 note_linking: zeph_memory::NoteLinkingConfig {
706 enabled: cfg.note_linking.enabled,
707 similarity_threshold: cfg.note_linking.similarity_threshold,
708 top_k: cfg.note_linking.top_k,
709 timeout_secs: cfg.note_linking.timeout_secs,
710 },
711 link_weight_decay_lambda: cfg.link_weight_decay_lambda,
712 link_weight_decay_interval_secs: cfg.link_weight_decay_interval_secs,
713 belief_revision_enabled: cfg.belief_revision.enabled,
714 belief_revision_similarity_threshold: cfg.belief_revision.similarity_threshold,
715 conversation_id: self.memory_state.persistence.conversation_id.map(|c| c.0),
716 }
717 };
718
719 if self.rpe_should_skip(content).await {
722 tracing::debug!("D-MEM RPE: low-surprise turn, skipping graph extraction");
723 return;
724 }
725
726 let context_messages: Vec<String> = self
727 .msg
728 .messages
729 .iter()
730 .rev()
731 .filter(|m| {
732 m.role == Role::User
733 && !m
734 .parts
735 .iter()
736 .any(|p| matches!(p, MessagePart::ToolResult { .. }))
737 })
738 .take(4)
739 .map(|m| {
740 if m.content.len() > 2048 {
741 m.content[..m.content.floor_char_boundary(2048)].to_owned()
742 } else {
743 m.content.clone()
744 }
745 })
746 .collect();
747
748 let Some(memory) = self.memory_state.persistence.memory.clone() else {
749 return;
750 };
751
752 let validator: zeph_memory::semantic::PostExtractValidator =
753 if self.security.memory_validator.is_enabled() {
754 let v = self.security.memory_validator.clone();
755 Some(Box::new(move |result| {
756 v.validate_graph_extraction(result)
757 .map_err(|e| e.to_string())
758 }))
759 } else {
760 None
761 };
762
763 let content_owned = content.to_owned();
764 let graph_store = memory.graph_store.clone();
765 let metrics_tx = self.metrics.metrics_tx.clone();
766 let start_time = self.lifecycle.start_time;
767
768 self.lifecycle.supervisor.spawn(
769 super::agent_supervisor::TaskClass::Enrichment,
770 "graph_extraction",
771 async move {
772 let extraction_handle = memory.spawn_graph_extraction(
773 content_owned,
774 context_messages,
775 extraction_cfg,
776 validator,
777 );
778
779 if let (Some(store), Some(tx)) = (graph_store, metrics_tx) {
781 let _ = extraction_handle.await;
782 let (entities, edges, communities) = tokio::join!(
783 store.entity_count(),
784 store.active_edge_count(),
785 store.community_count()
786 );
787 let elapsed = start_time.elapsed().as_secs();
788 tx.send_modify(|m| {
789 m.uptime_seconds = elapsed;
790 m.graph_entities_total = entities.unwrap_or(0).cast_unsigned();
791 m.graph_edges_total = edges.unwrap_or(0).cast_unsigned();
792 m.graph_communities_total = communities.unwrap_or(0).cast_unsigned();
793 });
794 } else {
795 let _ = extraction_handle.await;
796 }
797
798 tracing::debug!("background graph extraction complete");
799 },
800 );
801
802 self.sync_community_detection_failures();
804 self.sync_graph_extraction_metrics();
805 let memory_for_sync = self.memory_state.persistence.memory.clone();
807 let metrics_tx_sync = self.metrics.metrics_tx.clone();
808 let start_time_sync = self.lifecycle.start_time;
809 let cid_sync = self.memory_state.persistence.conversation_id;
810 let graph_store_sync = memory_for_sync.as_ref().and_then(|m| m.graph_store.clone());
811 let sqlite_sync = memory_for_sync.as_ref().map(|m| m.sqlite().clone());
812 let guidelines_enabled = self.memory_state.extraction.graph_config.enabled;
813
814 self.lifecycle.supervisor.spawn(
815 super::agent_supervisor::TaskClass::Telemetry,
816 "graph_count_sync",
817 async move {
818 let Some(store) = graph_store_sync else {
819 return;
820 };
821 let Some(tx) = metrics_tx_sync else { return };
822
823 let (entities, edges, communities) = tokio::join!(
824 store.entity_count(),
825 store.active_edge_count(),
826 store.community_count()
827 );
828 let elapsed = start_time_sync.elapsed().as_secs();
829 tx.send_modify(|m| {
830 m.uptime_seconds = elapsed;
831 m.graph_entities_total = entities.unwrap_or(0).cast_unsigned();
832 m.graph_edges_total = edges.unwrap_or(0).cast_unsigned();
833 m.graph_communities_total = communities.unwrap_or(0).cast_unsigned();
834 });
835
836 if guidelines_enabled && let Some(sqlite) = sqlite_sync {
838 match tokio::time::timeout(
839 std::time::Duration::from_secs(10),
840 sqlite.load_compression_guidelines_meta(cid_sync),
841 )
842 .await
843 {
844 Ok(Ok((version, created_at))) => {
845 #[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
846 let version_u32 = u32::try_from(version).unwrap_or(0);
847 tx.send_modify(|m| {
848 m.guidelines_version = version_u32;
849 m.guidelines_updated_at = created_at;
850 });
851 }
852 Ok(Err(e)) => {
853 tracing::debug!("guidelines status sync failed: {e:#}");
854 }
855 Err(_) => {
856 tracing::debug!("guidelines status sync timed out");
857 }
858 }
859 }
860 },
861 );
862 }
863
864 fn enqueue_persona_extraction_task(&mut self) {
866 use zeph_memory::semantic::{PersonaExtractionConfig, extract_persona_facts};
867
868 let cfg = &self.memory_state.extraction.persona_config;
869 if !cfg.enabled {
870 return;
871 }
872
873 let Some(memory) = &self.memory_state.persistence.memory else {
874 return;
875 };
876
877 let user_messages: Vec<String> = self
878 .msg
879 .messages
880 .iter()
881 .filter(|m| {
882 m.role == Role::User
883 && !m
884 .parts
885 .iter()
886 .any(|p| matches!(p, MessagePart::ToolResult { .. }))
887 })
888 .take(8)
889 .map(|m| {
890 if m.content.len() > 2048 {
891 m.content[..m.content.floor_char_boundary(2048)].to_owned()
892 } else {
893 m.content.clone()
894 }
895 })
896 .collect();
897
898 if user_messages.len() < cfg.min_messages {
899 return;
900 }
901
902 let timeout_secs = cfg.extraction_timeout_secs;
903 let extraction_cfg = PersonaExtractionConfig {
904 enabled: cfg.enabled,
905 min_messages: cfg.min_messages,
906 max_messages: cfg.max_messages,
907 extraction_timeout_secs: timeout_secs,
908 };
909
910 let provider = self.resolve_background_provider(cfg.persona_provider.as_str());
911 let store = memory.sqlite().clone();
912 let conversation_id = self.memory_state.persistence.conversation_id.map(|c| c.0);
913
914 self.lifecycle.supervisor.spawn(
915 super::agent_supervisor::TaskClass::Enrichment,
916 "persona_extraction",
917 async move {
918 let user_message_refs: Vec<&str> =
919 user_messages.iter().map(String::as_str).collect();
920 let fut = extract_persona_facts(
921 &store,
922 &provider,
923 &user_message_refs,
924 &extraction_cfg,
925 conversation_id,
926 );
927 match tokio::time::timeout(std::time::Duration::from_secs(timeout_secs), fut).await
928 {
929 Ok(Ok(n)) => tracing::debug!(upserted = n, "persona extraction complete"),
930 Ok(Err(e)) => tracing::warn!(error = %e, "persona extraction failed"),
931 Err(_) => tracing::warn!(
932 timeout_secs,
933 "persona extraction timed out — no facts written this turn"
934 ),
935 }
936 },
937 );
938 }
939
940 fn enqueue_trajectory_extraction_task(&mut self) {
942 use zeph_memory::semantic::{TrajectoryExtractionConfig, extract_trajectory_entries};
943
944 let cfg = self.memory_state.extraction.trajectory_config.clone();
945 if !cfg.enabled {
946 return;
947 }
948
949 let Some(memory) = &self.memory_state.persistence.memory else {
950 return;
951 };
952
953 let conversation_id = match self.memory_state.persistence.conversation_id {
954 Some(cid) => cid.0,
955 None => return,
956 };
957
958 let tail_start = self.msg.messages.len().saturating_sub(cfg.max_messages);
959 let turn_messages: Vec<zeph_llm::provider::Message> =
960 self.msg.messages[tail_start..].to_vec();
961
962 if turn_messages.is_empty() {
963 return;
964 }
965
966 let extraction_cfg = TrajectoryExtractionConfig {
967 enabled: cfg.enabled,
968 max_messages: cfg.max_messages,
969 extraction_timeout_secs: cfg.extraction_timeout_secs,
970 };
971
972 let provider = self.resolve_background_provider(cfg.trajectory_provider.as_str());
973 let store = memory.sqlite().clone();
974 let min_confidence = cfg.min_confidence;
975
976 self.lifecycle.supervisor.spawn(
977 super::agent_supervisor::TaskClass::Enrichment,
978 "trajectory_extraction",
979 async move {
980 let entries =
981 match extract_trajectory_entries(&provider, &turn_messages, &extraction_cfg)
982 .await
983 {
984 Ok(e) => e,
985 Err(e) => {
986 tracing::warn!(error = %e, "trajectory extraction failed");
987 return;
988 }
989 };
990
991 let last_id = store
992 .trajectory_last_extracted_message_id(conversation_id)
993 .await
994 .unwrap_or(0);
995
996 let mut max_id = last_id;
997 for entry in &entries {
998 if entry.confidence < min_confidence {
999 continue;
1000 }
1001 let tools_json = serde_json::to_string(&entry.tools_used)
1002 .unwrap_or_else(|_| "[]".to_string());
1003 match store
1004 .insert_trajectory_entry(zeph_memory::NewTrajectoryEntry {
1005 conversation_id: Some(conversation_id),
1006 turn_index: 0,
1007 kind: &entry.kind,
1008 intent: &entry.intent,
1009 outcome: &entry.outcome,
1010 tools_used: &tools_json,
1011 confidence: entry.confidence,
1012 })
1013 .await
1014 {
1015 Ok(id) => {
1016 if id > max_id {
1017 max_id = id;
1018 }
1019 }
1020 Err(e) => tracing::warn!(error = %e, "failed to insert trajectory entry"),
1021 }
1022 }
1023
1024 if max_id > last_id {
1025 let _ = store
1026 .set_trajectory_last_extracted_message_id(conversation_id, max_id)
1027 .await;
1028 }
1029
1030 tracing::debug!(
1031 count = entries.len(),
1032 conversation_id,
1033 "trajectory extraction complete"
1034 );
1035 },
1036 );
1037 }
1038
1039 fn enqueue_reasoning_extraction_task(&mut self) {
1044 let cfg = self.memory_state.extraction.reasoning_config.clone();
1045 if !cfg.enabled {
1046 return;
1047 }
1048
1049 let Some(memory) = &self.memory_state.persistence.memory else {
1050 return;
1051 };
1052
1053 let Some(reasoning) = memory.reasoning.clone() else {
1054 return;
1055 };
1056
1057 let tail_start = self.msg.messages.len().saturating_sub(cfg.max_messages);
1058 let turn_messages: Vec<zeph_llm::provider::Message> =
1059 self.msg.messages[tail_start..].to_vec();
1060
1061 if turn_messages.len() < cfg.min_messages {
1062 return;
1063 }
1064
1065 let extract_provider = self.resolve_background_provider(cfg.extract_provider.as_str());
1066 let distill_provider = self.resolve_background_provider(cfg.distill_provider.as_str());
1067 let embed_provider = memory.effective_embed_provider().clone();
1068 let store_limit = cfg.store_limit;
1069 let extraction_timeout = std::time::Duration::from_secs(cfg.extraction_timeout_secs);
1070 let distill_timeout = std::time::Duration::from_secs(cfg.distill_timeout_secs);
1071 let self_judge_window = cfg.self_judge_window;
1072 let min_assistant_chars = cfg.min_assistant_chars;
1073
1074 self.lifecycle.supervisor.spawn(
1075 super::agent_supervisor::TaskClass::Enrichment,
1076 "reasoning_extraction",
1077 async move {
1078 if let Err(e) = zeph_memory::process_reasoning_turn(
1079 &reasoning,
1080 &extract_provider,
1081 &distill_provider,
1082 &embed_provider,
1083 &turn_messages,
1084 zeph_memory::ProcessTurnConfig {
1085 store_limit,
1086 extraction_timeout,
1087 distill_timeout,
1088 self_judge_window,
1089 min_assistant_chars,
1090 },
1091 )
1092 .await
1093 {
1094 tracing::warn!(error = %e, "reasoning: process_turn failed");
1095 }
1096
1097 tracing::debug!("reasoning extraction complete");
1098 },
1099 );
1100 }
1101
1102 async fn rpe_should_skip(&mut self, content: &str) -> bool {
1107 let Some(ref rpe_mutex) = self.memory_state.extraction.rpe_router else {
1108 return false;
1109 };
1110 let Some(memory) = &self.memory_state.persistence.memory else {
1111 return false;
1112 };
1113 let candidates = zeph_memory::extract_candidate_entities(content);
1114 let provider = memory.provider();
1115 let Ok(Ok(emb_vec)) =
1116 tokio::time::timeout(std::time::Duration::from_secs(5), provider.embed(content)).await
1117 else {
1118 return false; };
1120 if let Ok(mut router) = rpe_mutex.lock() {
1121 let signal = router.compute(&emb_vec, &candidates);
1122 router.push_embedding(emb_vec);
1123 router.push_entities(&candidates);
1124 !signal.should_extract
1125 } else {
1126 tracing::warn!("rpe_router mutex poisoned; falling through to extract");
1127 false
1128 }
1129 }
1130}
1131
1132#[cfg(test)]
1133mod tests {
1134 use super::super::agent_tests::{
1135 MetricsSnapshot, MockChannel, MockToolExecutor, create_test_registry, mock_provider,
1136 };
1137 use super::*;
1138 use zeph_llm::any::AnyProvider;
1139 use zeph_memory::semantic::SemanticMemory;
1140
1141 async fn test_memory(provider: &AnyProvider) -> SemanticMemory {
1142 SemanticMemory::new(
1143 ":memory:",
1144 "http://127.0.0.1:1",
1145 provider.clone(),
1146 "test-model",
1147 )
1148 .await
1149 .unwrap()
1150 }
1151
1152 #[tokio::test]
1153 async fn load_history_without_memory_returns_ok() {
1154 let provider = mock_provider(vec![]);
1155 let channel = MockChannel::new(vec![]);
1156 let registry = create_test_registry();
1157 let executor = MockToolExecutor::no_tools();
1158 let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
1159
1160 let result = agent.load_history().await;
1161 assert!(result.is_ok());
1162 assert_eq!(agent.msg.messages.len(), 1); }
1165
1166 #[tokio::test]
1167 async fn load_history_with_messages_injects_into_agent() {
1168 let provider = mock_provider(vec![]);
1169 let channel = MockChannel::new(vec![]);
1170 let registry = create_test_registry();
1171 let executor = MockToolExecutor::no_tools();
1172
1173 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1174 let cid = memory.sqlite().create_conversation().await.unwrap();
1175
1176 memory
1177 .sqlite()
1178 .save_message(cid, "user", "hello from history")
1179 .await
1180 .unwrap();
1181 memory
1182 .sqlite()
1183 .save_message(cid, "assistant", "hi back")
1184 .await
1185 .unwrap();
1186
1187 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
1188 std::sync::Arc::new(memory),
1189 cid,
1190 50,
1191 5,
1192 100,
1193 );
1194
1195 let messages_before = agent.msg.messages.len();
1196 agent.load_history().await.unwrap();
1197 assert_eq!(agent.msg.messages.len(), messages_before + 2);
1199 }
1200
1201 #[tokio::test]
1202 async fn load_history_skips_empty_messages() {
1203 let provider = mock_provider(vec![]);
1204 let channel = MockChannel::new(vec![]);
1205 let registry = create_test_registry();
1206 let executor = MockToolExecutor::no_tools();
1207
1208 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1209 let cid = memory.sqlite().create_conversation().await.unwrap();
1210
1211 memory
1213 .sqlite()
1214 .save_message(cid, "user", " ")
1215 .await
1216 .unwrap();
1217 memory
1218 .sqlite()
1219 .save_message(cid, "user", "real message")
1220 .await
1221 .unwrap();
1222
1223 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
1224 std::sync::Arc::new(memory),
1225 cid,
1226 50,
1227 5,
1228 100,
1229 );
1230
1231 let messages_before = agent.msg.messages.len();
1232 agent.load_history().await.unwrap();
1233 assert_eq!(agent.msg.messages.len(), messages_before + 1);
1235 }
1236
1237 #[tokio::test]
1238 async fn load_history_with_empty_store_returns_ok() {
1239 let provider = mock_provider(vec![]);
1240 let channel = MockChannel::new(vec![]);
1241 let registry = create_test_registry();
1242 let executor = MockToolExecutor::no_tools();
1243
1244 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1245 let cid = memory.sqlite().create_conversation().await.unwrap();
1246
1247 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
1248 std::sync::Arc::new(memory),
1249 cid,
1250 50,
1251 5,
1252 100,
1253 );
1254
1255 let messages_before = agent.msg.messages.len();
1256 agent.load_history().await.unwrap();
1257 assert_eq!(agent.msg.messages.len(), messages_before);
1259 }
1260
1261 #[tokio::test]
1262 async fn load_history_increments_session_count_for_existing_messages() {
1263 let provider = mock_provider(vec![]);
1264 let channel = MockChannel::new(vec![]);
1265 let registry = create_test_registry();
1266 let executor = MockToolExecutor::no_tools();
1267
1268 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1269 let cid = memory.sqlite().create_conversation().await.unwrap();
1270
1271 let id1 = memory
1273 .sqlite()
1274 .save_message(cid, "user", "hello")
1275 .await
1276 .unwrap();
1277 let id2 = memory
1278 .sqlite()
1279 .save_message(cid, "assistant", "hi")
1280 .await
1281 .unwrap();
1282
1283 let memory_arc = std::sync::Arc::new(memory);
1284 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
1285 memory_arc.clone(),
1286 cid,
1287 50,
1288 5,
1289 100,
1290 );
1291
1292 agent.load_history().await.unwrap();
1293
1294 let counts: Vec<i64> = zeph_db::query_scalar(
1296 "SELECT session_count FROM messages WHERE id IN (?, ?) ORDER BY id",
1297 )
1298 .bind(id1)
1299 .bind(id2)
1300 .fetch_all(memory_arc.sqlite().pool())
1301 .await
1302 .unwrap();
1303 assert_eq!(
1304 counts,
1305 vec![1, 1],
1306 "session_count must be 1 after first restore"
1307 );
1308 }
1309
1310 #[tokio::test]
1311 async fn load_history_does_not_increment_session_count_for_new_conversation() {
1312 let provider = mock_provider(vec![]);
1313 let channel = MockChannel::new(vec![]);
1314 let registry = create_test_registry();
1315 let executor = MockToolExecutor::no_tools();
1316
1317 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1318 let cid = memory.sqlite().create_conversation().await.unwrap();
1319
1320 let memory_arc = std::sync::Arc::new(memory);
1322 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
1323 memory_arc.clone(),
1324 cid,
1325 50,
1326 5,
1327 100,
1328 );
1329
1330 agent.load_history().await.unwrap();
1331
1332 let counts: Vec<i64> =
1334 zeph_db::query_scalar("SELECT session_count FROM messages WHERE conversation_id = ?")
1335 .bind(cid)
1336 .fetch_all(memory_arc.sqlite().pool())
1337 .await
1338 .unwrap();
1339 assert!(counts.is_empty(), "new conversation must have no messages");
1340 }
1341
1342 #[tokio::test]
1343 async fn persist_message_without_memory_silently_returns() {
1344 let provider = mock_provider(vec![]);
1346 let channel = MockChannel::new(vec![]);
1347 let registry = create_test_registry();
1348 let executor = MockToolExecutor::no_tools();
1349 let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
1350
1351 agent.persist_message(Role::User, "hello", &[], false).await;
1353 }
1354
1355 #[tokio::test]
1356 async fn persist_message_assistant_autosave_false_uses_save_only() {
1357 let provider = mock_provider(vec![]);
1358 let channel = MockChannel::new(vec![]);
1359 let registry = create_test_registry();
1360 let executor = MockToolExecutor::no_tools();
1361
1362 let (tx, rx) = tokio::sync::watch::channel(MetricsSnapshot::default());
1363 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1364 let cid = memory.sqlite().create_conversation().await.unwrap();
1365
1366 let mut agent = Agent::new(provider, channel, registry, None, 5, executor)
1367 .with_metrics(tx)
1368 .with_memory(std::sync::Arc::new(memory), cid, 50, 5, 100);
1369 agent.memory_state.persistence.autosave_assistant = false;
1370 agent.memory_state.persistence.autosave_min_length = 20;
1371
1372 agent
1373 .persist_message(Role::Assistant, "short assistant reply", &[], false)
1374 .await;
1375
1376 let history = agent
1377 .memory_state
1378 .persistence
1379 .memory
1380 .as_ref()
1381 .unwrap()
1382 .sqlite()
1383 .load_history(cid, 50)
1384 .await
1385 .unwrap();
1386 assert_eq!(history.len(), 1, "message must be saved");
1387 assert_eq!(history[0].content, "short assistant reply");
1388 assert_eq!(rx.borrow().embeddings_generated, 0);
1390 }
1391
1392 #[tokio::test]
1393 async fn persist_message_assistant_below_min_length_uses_save_only() {
1394 let provider = mock_provider(vec![]);
1395 let channel = MockChannel::new(vec![]);
1396 let registry = create_test_registry();
1397 let executor = MockToolExecutor::no_tools();
1398
1399 let (tx, rx) = tokio::sync::watch::channel(MetricsSnapshot::default());
1400 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1401 let cid = memory.sqlite().create_conversation().await.unwrap();
1402
1403 let mut agent = Agent::new(provider, channel, registry, None, 5, executor)
1405 .with_metrics(tx)
1406 .with_memory(std::sync::Arc::new(memory), cid, 50, 5, 100);
1407 agent.memory_state.persistence.autosave_assistant = true;
1408 agent.memory_state.persistence.autosave_min_length = 1000;
1409
1410 agent
1411 .persist_message(Role::Assistant, "too short", &[], false)
1412 .await;
1413
1414 let history = agent
1415 .memory_state
1416 .persistence
1417 .memory
1418 .as_ref()
1419 .unwrap()
1420 .sqlite()
1421 .load_history(cid, 50)
1422 .await
1423 .unwrap();
1424 assert_eq!(history.len(), 1, "message must be saved");
1425 assert_eq!(history[0].content, "too short");
1426 assert_eq!(rx.borrow().embeddings_generated, 0);
1427 }
1428
1429 #[tokio::test]
1430 async fn persist_message_assistant_at_min_length_boundary_uses_embed() {
1431 let provider = mock_provider(vec![]);
1433 let channel = MockChannel::new(vec![]);
1434 let registry = create_test_registry();
1435 let executor = MockToolExecutor::no_tools();
1436
1437 let (tx, rx) = tokio::sync::watch::channel(MetricsSnapshot::default());
1438 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1439 let cid = memory.sqlite().create_conversation().await.unwrap();
1440
1441 let min_length = 10usize;
1442 let mut agent = Agent::new(provider, channel, registry, None, 5, executor)
1443 .with_metrics(tx)
1444 .with_memory(std::sync::Arc::new(memory), cid, 50, 5, 100);
1445 agent.memory_state.persistence.autosave_assistant = true;
1446 agent.memory_state.persistence.autosave_min_length = min_length;
1447
1448 let content_at_boundary = "A".repeat(min_length);
1450 assert_eq!(content_at_boundary.len(), min_length);
1451 agent
1452 .persist_message(Role::Assistant, &content_at_boundary, &[], false)
1453 .await;
1454
1455 assert_eq!(rx.borrow().sqlite_message_count, 1);
1457 }
1458
1459 #[tokio::test]
1460 async fn persist_message_assistant_one_below_min_length_uses_save_only() {
1461 let provider = mock_provider(vec![]);
1463 let channel = MockChannel::new(vec![]);
1464 let registry = create_test_registry();
1465 let executor = MockToolExecutor::no_tools();
1466
1467 let (tx, rx) = tokio::sync::watch::channel(MetricsSnapshot::default());
1468 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1469 let cid = memory.sqlite().create_conversation().await.unwrap();
1470
1471 let min_length = 10usize;
1472 let mut agent = Agent::new(provider, channel, registry, None, 5, executor)
1473 .with_metrics(tx)
1474 .with_memory(std::sync::Arc::new(memory), cid, 50, 5, 100);
1475 agent.memory_state.persistence.autosave_assistant = true;
1476 agent.memory_state.persistence.autosave_min_length = min_length;
1477
1478 let content_below_boundary = "A".repeat(min_length - 1);
1480 assert_eq!(content_below_boundary.len(), min_length - 1);
1481 agent
1482 .persist_message(Role::Assistant, &content_below_boundary, &[], false)
1483 .await;
1484
1485 let history = agent
1486 .memory_state
1487 .persistence
1488 .memory
1489 .as_ref()
1490 .unwrap()
1491 .sqlite()
1492 .load_history(cid, 50)
1493 .await
1494 .unwrap();
1495 assert_eq!(history.len(), 1, "message must still be saved");
1496 assert_eq!(rx.borrow().embeddings_generated, 0);
1498 }
1499
1500 #[tokio::test]
1501 async fn persist_message_increments_unsummarized_count() {
1502 let provider = mock_provider(vec![]);
1503 let channel = MockChannel::new(vec![]);
1504 let registry = create_test_registry();
1505 let executor = MockToolExecutor::no_tools();
1506
1507 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1508 let cid = memory.sqlite().create_conversation().await.unwrap();
1509
1510 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
1512 std::sync::Arc::new(memory),
1513 cid,
1514 50,
1515 5,
1516 100,
1517 );
1518
1519 assert_eq!(agent.memory_state.persistence.unsummarized_count, 0);
1520
1521 agent.persist_message(Role::User, "first", &[], false).await;
1522 assert_eq!(agent.memory_state.persistence.unsummarized_count, 1);
1523
1524 agent
1525 .persist_message(Role::User, "second", &[], false)
1526 .await;
1527 assert_eq!(agent.memory_state.persistence.unsummarized_count, 2);
1528 }
1529
1530 #[tokio::test]
1531 async fn check_summarization_resets_counter_on_success() {
1532 let provider = mock_provider(vec![]);
1533 let channel = MockChannel::new(vec![]);
1534 let registry = create_test_registry();
1535 let executor = MockToolExecutor::no_tools();
1536
1537 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1538 let cid = memory.sqlite().create_conversation().await.unwrap();
1539
1540 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
1542 std::sync::Arc::new(memory),
1543 cid,
1544 50,
1545 5,
1546 1,
1547 );
1548
1549 agent.persist_message(Role::User, "msg1", &[], false).await;
1550 agent.persist_message(Role::User, "msg2", &[], false).await;
1551
1552 assert!(agent.memory_state.persistence.unsummarized_count <= 2);
1557 }
1558
1559 #[tokio::test]
1560 async fn unsummarized_count_not_incremented_without_memory() {
1561 let provider = mock_provider(vec![]);
1562 let channel = MockChannel::new(vec![]);
1563 let registry = create_test_registry();
1564 let executor = MockToolExecutor::no_tools();
1565 let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
1566
1567 agent.persist_message(Role::User, "hello", &[], false).await;
1568 assert_eq!(agent.memory_state.persistence.unsummarized_count, 0);
1570 }
1571
1572 mod graph_extraction_guards {
1574 use super::*;
1575 use crate::config::GraphConfig;
1576 use zeph_llm::provider::MessageMetadata;
1577 use zeph_memory::graph::GraphStore;
1578
1579 fn enabled_graph_config() -> GraphConfig {
1580 GraphConfig {
1581 enabled: true,
1582 ..GraphConfig::default()
1583 }
1584 }
1585
1586 async fn agent_with_graph(
1587 provider: &AnyProvider,
1588 config: GraphConfig,
1589 ) -> Agent<MockChannel> {
1590 let memory =
1591 test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1592 let cid = memory.sqlite().create_conversation().await.unwrap();
1593 Agent::new(
1594 provider.clone(),
1595 MockChannel::new(vec![]),
1596 create_test_registry(),
1597 None,
1598 5,
1599 MockToolExecutor::no_tools(),
1600 )
1601 .with_memory(std::sync::Arc::new(memory), cid, 50, 5, 100)
1602 .with_graph_config(config)
1603 }
1604
1605 #[tokio::test]
1606 async fn injection_flag_guard_skips_extraction() {
1607 let provider = mock_provider(vec![]);
1609 let mut agent = agent_with_graph(&provider, enabled_graph_config()).await;
1610 let pool = agent
1611 .memory_state
1612 .persistence
1613 .memory
1614 .as_ref()
1615 .unwrap()
1616 .sqlite()
1617 .pool()
1618 .clone();
1619
1620 agent
1621 .enqueue_graph_extraction_task("I use Rust", true, false)
1622 .await;
1623
1624 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1626
1627 let store = GraphStore::new(pool);
1628 let count = store.get_metadata("extraction_count").await.unwrap();
1629 assert!(
1630 count.is_none(),
1631 "injection flag must prevent extraction_count from being written"
1632 );
1633 }
1634
1635 #[tokio::test]
1636 async fn disabled_config_guard_skips_extraction() {
1637 let provider = mock_provider(vec![]);
1639 let disabled_cfg = GraphConfig {
1640 enabled: false,
1641 ..GraphConfig::default()
1642 };
1643 let mut agent = agent_with_graph(&provider, disabled_cfg).await;
1644 let pool = agent
1645 .memory_state
1646 .persistence
1647 .memory
1648 .as_ref()
1649 .unwrap()
1650 .sqlite()
1651 .pool()
1652 .clone();
1653
1654 agent
1655 .enqueue_graph_extraction_task("I use Rust", false, false)
1656 .await;
1657
1658 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1659
1660 let store = GraphStore::new(pool);
1661 let count = store.get_metadata("extraction_count").await.unwrap();
1662 assert!(
1663 count.is_none(),
1664 "disabled graph config must prevent extraction"
1665 );
1666 }
1667
1668 #[tokio::test]
1669 async fn happy_path_fires_extraction() {
1670 let provider = mock_provider(vec![]);
1673 let mut agent = agent_with_graph(&provider, enabled_graph_config()).await;
1674 let pool = agent
1675 .memory_state
1676 .persistence
1677 .memory
1678 .as_ref()
1679 .unwrap()
1680 .sqlite()
1681 .pool()
1682 .clone();
1683
1684 agent
1685 .enqueue_graph_extraction_task("I use Rust for systems programming", false, false)
1686 .await;
1687
1688 tokio::time::sleep(std::time::Duration::from_millis(200)).await;
1690
1691 let store = GraphStore::new(pool);
1692 let count = store.get_metadata("extraction_count").await.unwrap();
1693 assert!(
1694 count.is_some(),
1695 "happy-path extraction must increment extraction_count"
1696 );
1697 }
1698
1699 #[tokio::test]
1700 async fn tool_result_parts_guard_skips_extraction() {
1701 let provider = mock_provider(vec![]);
1705 let mut agent = agent_with_graph(&provider, enabled_graph_config()).await;
1706 let pool = agent
1707 .memory_state
1708 .persistence
1709 .memory
1710 .as_ref()
1711 .unwrap()
1712 .sqlite()
1713 .pool()
1714 .clone();
1715
1716 agent
1717 .enqueue_graph_extraction_task(
1718 "[tool_result: abc123]\nprovider_type = \"claude\"\nallowed_commands = []",
1719 false,
1720 true, )
1722 .await;
1723
1724 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1725
1726 let store = GraphStore::new(pool);
1727 let count = store.get_metadata("extraction_count").await.unwrap();
1728 assert!(
1729 count.is_none(),
1730 "tool result message must not trigger graph extraction"
1731 );
1732 }
1733
1734 #[tokio::test]
1735 async fn context_filter_excludes_tool_result_messages() {
1736 let provider = mock_provider(vec![]);
1747 let mut agent = agent_with_graph(&provider, enabled_graph_config()).await;
1748
1749 agent.msg.messages.push(Message {
1752 role: Role::User,
1753 content: "[tool_result: abc]\nprovider_type = \"openai\"".to_owned(),
1754 parts: vec![MessagePart::ToolResult {
1755 tool_use_id: "abc".to_owned(),
1756 content: "provider_type = \"openai\"".to_owned(),
1757 is_error: false,
1758 }],
1759 metadata: MessageMetadata::default(),
1760 });
1761
1762 let pool = agent
1763 .memory_state
1764 .persistence
1765 .memory
1766 .as_ref()
1767 .unwrap()
1768 .sqlite()
1769 .pool()
1770 .clone();
1771
1772 agent
1774 .enqueue_graph_extraction_task(
1775 "I prefer Rust for systems programming",
1776 false,
1777 false,
1778 )
1779 .await;
1780
1781 tokio::time::sleep(std::time::Duration::from_millis(200)).await;
1782
1783 let store = GraphStore::new(pool);
1785 let count = store.get_metadata("extraction_count").await.unwrap();
1786 assert!(
1787 count.is_some(),
1788 "conversational message must trigger extraction even with prior tool result in history"
1789 );
1790 }
1791 }
1792
1793 mod persona_extraction_guards {
1795 use super::*;
1796 use zeph_config::PersonaConfig;
1797 use zeph_llm::provider::MessageMetadata;
1798
1799 fn enabled_persona_config() -> PersonaConfig {
1800 PersonaConfig {
1801 enabled: true,
1802 min_messages: 1,
1803 ..PersonaConfig::default()
1804 }
1805 }
1806
1807 async fn agent_with_persona(
1808 provider: &AnyProvider,
1809 config: PersonaConfig,
1810 ) -> Agent<MockChannel> {
1811 let memory =
1812 test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1813 let cid = memory.sqlite().create_conversation().await.unwrap();
1814 let mut agent = Agent::new(
1815 provider.clone(),
1816 MockChannel::new(vec![]),
1817 create_test_registry(),
1818 None,
1819 5,
1820 MockToolExecutor::no_tools(),
1821 )
1822 .with_memory(std::sync::Arc::new(memory), cid, 50, 5, 100);
1823 agent.memory_state.extraction.persona_config = config;
1824 agent
1825 }
1826
1827 #[tokio::test]
1828 async fn disabled_config_skips_spawn() {
1829 let provider = mock_provider(vec![]);
1831 let mut agent = agent_with_persona(
1832 &provider,
1833 PersonaConfig {
1834 enabled: false,
1835 ..PersonaConfig::default()
1836 },
1837 )
1838 .await;
1839
1840 agent.msg.messages.push(zeph_llm::provider::Message {
1842 role: Role::User,
1843 content: "I prefer Rust for systems programming".to_owned(),
1844 parts: vec![],
1845 metadata: MessageMetadata::default(),
1846 });
1847
1848 agent.enqueue_persona_extraction_task();
1849
1850 let store = agent
1851 .memory_state
1852 .persistence
1853 .memory
1854 .as_ref()
1855 .unwrap()
1856 .sqlite()
1857 .clone();
1858 let count = store.count_persona_facts().await.unwrap();
1859 assert_eq!(count, 0, "disabled persona config must not write any facts");
1860 }
1861
1862 #[tokio::test]
1863 async fn below_min_messages_skips_spawn() {
1864 let provider = mock_provider(vec![]);
1866 let mut agent = agent_with_persona(
1867 &provider,
1868 PersonaConfig {
1869 enabled: true,
1870 min_messages: 3,
1871 ..PersonaConfig::default()
1872 },
1873 )
1874 .await;
1875
1876 for text in ["I use Rust", "I prefer async code"] {
1877 agent.msg.messages.push(zeph_llm::provider::Message {
1878 role: Role::User,
1879 content: text.to_owned(),
1880 parts: vec![],
1881 metadata: MessageMetadata::default(),
1882 });
1883 }
1884
1885 agent.enqueue_persona_extraction_task();
1886
1887 let store = agent
1888 .memory_state
1889 .persistence
1890 .memory
1891 .as_ref()
1892 .unwrap()
1893 .sqlite()
1894 .clone();
1895 let count = store.count_persona_facts().await.unwrap();
1896 assert_eq!(
1897 count, 0,
1898 "below min_messages threshold must not trigger extraction"
1899 );
1900 }
1901
1902 #[tokio::test]
1903 async fn no_memory_skips_spawn() {
1904 let provider = mock_provider(vec![]);
1906 let channel = MockChannel::new(vec![]);
1907 let registry = create_test_registry();
1908 let executor = MockToolExecutor::no_tools();
1909 let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
1910 agent.memory_state.extraction.persona_config = enabled_persona_config();
1911 agent.msg.messages.push(zeph_llm::provider::Message {
1912 role: Role::User,
1913 content: "I like Rust".to_owned(),
1914 parts: vec![],
1915 metadata: MessageMetadata::default(),
1916 });
1917
1918 agent.enqueue_persona_extraction_task();
1920 }
1921
1922 #[tokio::test]
1923 async fn enabled_enough_messages_spawns_extraction() {
1924 use zeph_llm::mock::MockProvider;
1927 let (mock, recorded) = MockProvider::default().with_recording();
1928 let provider = AnyProvider::Mock(mock);
1929 let mut agent = agent_with_persona(&provider, enabled_persona_config()).await;
1930
1931 agent.msg.messages.push(zeph_llm::provider::Message {
1932 role: Role::User,
1933 content: "I prefer Rust for systems programming".to_owned(),
1934 parts: vec![],
1935 metadata: MessageMetadata::default(),
1936 });
1937
1938 agent.enqueue_persona_extraction_task();
1939
1940 agent.lifecycle.supervisor.join_all_for_test().await;
1942
1943 let calls = recorded.lock().unwrap();
1944 assert!(
1945 !calls.is_empty(),
1946 "happy-path: provider.chat() must be called when extraction completes"
1947 );
1948 }
1949
1950 #[tokio::test]
1951 async fn messages_capped_at_eight() {
1952 use zeph_llm::mock::MockProvider;
1955 let (mock, recorded) = MockProvider::default().with_recording();
1956 let provider = AnyProvider::Mock(mock);
1957 let mut agent = agent_with_persona(&provider, enabled_persona_config()).await;
1958
1959 for i in 0..12u32 {
1960 agent.msg.messages.push(zeph_llm::provider::Message {
1961 role: Role::User,
1962 content: format!("I like message {i}"),
1963 parts: vec![],
1964 metadata: MessageMetadata::default(),
1965 });
1966 }
1967
1968 agent.enqueue_persona_extraction_task();
1969
1970 agent.lifecycle.supervisor.join_all_for_test().await;
1972
1973 let calls = recorded.lock().unwrap();
1975 assert!(
1976 !calls.is_empty(),
1977 "extraction must run when enough messages present"
1978 );
1979 let prompt = &calls[0];
1981 let user_text = prompt
1982 .iter()
1983 .filter(|m| m.role == Role::User)
1984 .map(|m| m.content.as_str())
1985 .collect::<Vec<_>>()
1986 .join(" ");
1987 assert!(
1989 !user_text.contains("I like message 8"),
1990 "message index 8 must be excluded from extraction input"
1991 );
1992 }
1993
1994 #[test]
1995 fn long_message_truncated_at_char_boundary() {
1996 let long_content = "x".repeat(3000);
2000 let truncated = if long_content.len() > 2048 {
2001 long_content[..long_content.floor_char_boundary(2048)].to_owned()
2002 } else {
2003 long_content.clone()
2004 };
2005 assert_eq!(
2006 truncated.len(),
2007 2048,
2008 "ASCII content must be truncated to exactly 2048 bytes"
2009 );
2010
2011 let multi = "é".repeat(1500); let truncated_multi = if multi.len() > 2048 {
2014 multi[..multi.floor_char_boundary(2048)].to_owned()
2015 } else {
2016 multi.clone()
2017 };
2018 assert!(
2019 truncated_multi.len() <= 2048,
2020 "multi-byte content must not exceed 2048 bytes"
2021 );
2022 assert!(truncated_multi.is_char_boundary(truncated_multi.len()));
2023 }
2024 }
2025
2026 #[tokio::test]
2027 async fn persist_message_user_always_embeds_regardless_of_autosave_flag() {
2028 let provider = mock_provider(vec![]);
2029 let channel = MockChannel::new(vec![]);
2030 let registry = create_test_registry();
2031 let executor = MockToolExecutor::no_tools();
2032
2033 let (tx, rx) = tokio::sync::watch::channel(MetricsSnapshot::default());
2034 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2035 let cid = memory.sqlite().create_conversation().await.unwrap();
2036
2037 let mut agent = Agent::new(provider, channel, registry, None, 5, executor)
2039 .with_metrics(tx)
2040 .with_memory(std::sync::Arc::new(memory), cid, 50, 5, 100);
2041 agent.memory_state.persistence.autosave_assistant = false;
2042 agent.memory_state.persistence.autosave_min_length = 20;
2043
2044 let long_user_msg = "A".repeat(100);
2045 agent
2046 .persist_message(Role::User, &long_user_msg, &[], false)
2047 .await;
2048
2049 let history = agent
2050 .memory_state
2051 .persistence
2052 .memory
2053 .as_ref()
2054 .unwrap()
2055 .sqlite()
2056 .load_history(cid, 50)
2057 .await
2058 .unwrap();
2059 assert_eq!(history.len(), 1, "user message must be saved");
2060 assert_eq!(rx.borrow().sqlite_message_count, 1);
2063 }
2064
2065 #[tokio::test]
2069 async fn persist_message_saves_correct_tool_use_parts() {
2070 use zeph_llm::provider::MessagePart;
2071
2072 let provider = mock_provider(vec![]);
2073 let channel = MockChannel::new(vec![]);
2074 let registry = create_test_registry();
2075 let executor = MockToolExecutor::no_tools();
2076
2077 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2078 let cid = memory.sqlite().create_conversation().await.unwrap();
2079
2080 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2081 std::sync::Arc::new(memory),
2082 cid,
2083 50,
2084 5,
2085 100,
2086 );
2087
2088 let parts = vec![MessagePart::ToolUse {
2089 id: "call_abc123".to_string(),
2090 name: "read_file".to_string(),
2091 input: serde_json::json!({"path": "/tmp/test.txt"}),
2092 }];
2093 let content = "[tool_use: read_file(call_abc123)]";
2094
2095 agent
2096 .persist_message(Role::Assistant, content, &parts, false)
2097 .await;
2098
2099 let history = agent
2100 .memory_state
2101 .persistence
2102 .memory
2103 .as_ref()
2104 .unwrap()
2105 .sqlite()
2106 .load_history(cid, 50)
2107 .await
2108 .unwrap();
2109
2110 assert_eq!(history.len(), 1);
2111 assert_eq!(history[0].role, Role::Assistant);
2112 assert_eq!(history[0].content, content);
2113 assert_eq!(history[0].parts.len(), 1);
2114 match &history[0].parts[0] {
2115 MessagePart::ToolUse { id, name, .. } => {
2116 assert_eq!(id, "call_abc123");
2117 assert_eq!(name, "read_file");
2118 }
2119 other => panic!("expected ToolUse part, got {other:?}"),
2120 }
2121 assert!(
2123 !history[0]
2124 .parts
2125 .iter()
2126 .any(|p| matches!(p, MessagePart::ToolResult { .. })),
2127 "assistant message must not contain ToolResult parts"
2128 );
2129 }
2130
2131 #[tokio::test]
2132 async fn persist_message_saves_correct_tool_result_parts() {
2133 use zeph_llm::provider::MessagePart;
2134
2135 let provider = mock_provider(vec![]);
2136 let channel = MockChannel::new(vec![]);
2137 let registry = create_test_registry();
2138 let executor = MockToolExecutor::no_tools();
2139
2140 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2141 let cid = memory.sqlite().create_conversation().await.unwrap();
2142
2143 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2144 std::sync::Arc::new(memory),
2145 cid,
2146 50,
2147 5,
2148 100,
2149 );
2150
2151 let parts = vec![MessagePart::ToolResult {
2152 tool_use_id: "call_abc123".to_string(),
2153 content: "file contents here".to_string(),
2154 is_error: false,
2155 }];
2156 let content = "[tool_result: call_abc123]\nfile contents here";
2157
2158 agent
2159 .persist_message(Role::User, content, &parts, false)
2160 .await;
2161
2162 let history = agent
2163 .memory_state
2164 .persistence
2165 .memory
2166 .as_ref()
2167 .unwrap()
2168 .sqlite()
2169 .load_history(cid, 50)
2170 .await
2171 .unwrap();
2172
2173 assert_eq!(history.len(), 1);
2174 assert_eq!(history[0].role, Role::User);
2175 assert_eq!(history[0].content, content);
2176 assert_eq!(history[0].parts.len(), 1);
2177 match &history[0].parts[0] {
2178 MessagePart::ToolResult {
2179 tool_use_id,
2180 content: result_content,
2181 is_error,
2182 } => {
2183 assert_eq!(tool_use_id, "call_abc123");
2184 assert_eq!(result_content, "file contents here");
2185 assert!(!is_error);
2186 }
2187 other => panic!("expected ToolResult part, got {other:?}"),
2188 }
2189 assert!(
2191 !history[0]
2192 .parts
2193 .iter()
2194 .any(|p| matches!(p, MessagePart::ToolUse { .. })),
2195 "user ToolResult message must not contain ToolUse parts"
2196 );
2197 }
2198
2199 #[tokio::test]
2200 async fn persist_message_roundtrip_preserves_role_part_alignment() {
2201 use zeph_llm::provider::MessagePart;
2202
2203 let provider = mock_provider(vec![]);
2204 let channel = MockChannel::new(vec![]);
2205 let registry = create_test_registry();
2206 let executor = MockToolExecutor::no_tools();
2207
2208 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2209 let cid = memory.sqlite().create_conversation().await.unwrap();
2210
2211 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2212 std::sync::Arc::new(memory),
2213 cid,
2214 50,
2215 5,
2216 100,
2217 );
2218
2219 let assistant_parts = vec![MessagePart::ToolUse {
2221 id: "id_1".to_string(),
2222 name: "list_dir".to_string(),
2223 input: serde_json::json!({"path": "/tmp"}),
2224 }];
2225 agent
2226 .persist_message(
2227 Role::Assistant,
2228 "[tool_use: list_dir(id_1)]",
2229 &assistant_parts,
2230 false,
2231 )
2232 .await;
2233
2234 let user_parts = vec![MessagePart::ToolResult {
2236 tool_use_id: "id_1".to_string(),
2237 content: "file1.txt\nfile2.txt".to_string(),
2238 is_error: false,
2239 }];
2240 agent
2241 .persist_message(
2242 Role::User,
2243 "[tool_result: id_1]\nfile1.txt\nfile2.txt",
2244 &user_parts,
2245 false,
2246 )
2247 .await;
2248
2249 let history = agent
2250 .memory_state
2251 .persistence
2252 .memory
2253 .as_ref()
2254 .unwrap()
2255 .sqlite()
2256 .load_history(cid, 50)
2257 .await
2258 .unwrap();
2259
2260 assert_eq!(history.len(), 2);
2261
2262 assert_eq!(history[0].role, Role::Assistant);
2264 assert_eq!(history[0].content, "[tool_use: list_dir(id_1)]");
2265 assert!(
2266 matches!(&history[0].parts[0], MessagePart::ToolUse { id, .. } if id == "id_1"),
2267 "first message must be assistant ToolUse"
2268 );
2269
2270 assert_eq!(history[1].role, Role::User);
2272 assert_eq!(
2273 history[1].content,
2274 "[tool_result: id_1]\nfile1.txt\nfile2.txt"
2275 );
2276 assert!(
2277 matches!(&history[1].parts[0], MessagePart::ToolResult { tool_use_id, .. } if tool_use_id == "id_1"),
2278 "second message must be user ToolResult"
2279 );
2280
2281 assert!(
2283 !history[0]
2284 .parts
2285 .iter()
2286 .any(|p| matches!(p, MessagePart::ToolResult { .. })),
2287 "assistant message must not have ToolResult parts"
2288 );
2289 assert!(
2290 !history[1]
2291 .parts
2292 .iter()
2293 .any(|p| matches!(p, MessagePart::ToolUse { .. })),
2294 "user message must not have ToolUse parts"
2295 );
2296 }
2297
2298 #[tokio::test]
2299 async fn persist_message_saves_correct_tool_output_parts() {
2300 use zeph_llm::provider::MessagePart;
2301
2302 let provider = mock_provider(vec![]);
2303 let channel = MockChannel::new(vec![]);
2304 let registry = create_test_registry();
2305 let executor = MockToolExecutor::no_tools();
2306
2307 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2308 let cid = memory.sqlite().create_conversation().await.unwrap();
2309
2310 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2311 std::sync::Arc::new(memory),
2312 cid,
2313 50,
2314 5,
2315 100,
2316 );
2317
2318 let parts = vec![MessagePart::ToolOutput {
2319 tool_name: "shell".into(),
2320 body: "hello from shell".to_string(),
2321 compacted_at: None,
2322 }];
2323 let content = "[tool: shell]\nhello from shell";
2324
2325 agent
2326 .persist_message(Role::User, content, &parts, false)
2327 .await;
2328
2329 let history = agent
2330 .memory_state
2331 .persistence
2332 .memory
2333 .as_ref()
2334 .unwrap()
2335 .sqlite()
2336 .load_history(cid, 50)
2337 .await
2338 .unwrap();
2339
2340 assert_eq!(history.len(), 1);
2341 assert_eq!(history[0].role, Role::User);
2342 assert_eq!(history[0].content, content);
2343 assert_eq!(history[0].parts.len(), 1);
2344 match &history[0].parts[0] {
2345 MessagePart::ToolOutput {
2346 tool_name,
2347 body,
2348 compacted_at,
2349 } => {
2350 assert_eq!(tool_name, "shell");
2351 assert_eq!(body, "hello from shell");
2352 assert!(compacted_at.is_none());
2353 }
2354 other => panic!("expected ToolOutput part, got {other:?}"),
2355 }
2356 }
2357
2358 #[tokio::test]
2361 async fn load_history_removes_trailing_orphan_tool_use() {
2362 use zeph_llm::provider::MessagePart;
2363
2364 let provider = mock_provider(vec![]);
2365 let channel = MockChannel::new(vec![]);
2366 let registry = create_test_registry();
2367 let executor = MockToolExecutor::no_tools();
2368
2369 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2370 let cid = memory.sqlite().create_conversation().await.unwrap();
2371 let sqlite = memory.sqlite();
2372
2373 sqlite
2375 .save_message(cid, "user", "do something with a tool")
2376 .await
2377 .unwrap();
2378
2379 let parts = serde_json::to_string(&[MessagePart::ToolUse {
2381 id: "call_orphan".to_string(),
2382 name: "shell".to_string(),
2383 input: serde_json::json!({"command": "ls"}),
2384 }])
2385 .unwrap();
2386 sqlite
2387 .save_message_with_parts(cid, "assistant", "[tool_use: shell(call_orphan)]", &parts)
2388 .await
2389 .unwrap();
2390
2391 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2392 std::sync::Arc::new(memory),
2393 cid,
2394 50,
2395 5,
2396 100,
2397 );
2398
2399 let messages_before = agent.msg.messages.len();
2400 agent.load_history().await.unwrap();
2401
2402 assert_eq!(
2404 agent.msg.messages.len(),
2405 messages_before + 1,
2406 "orphaned trailing tool_use must be removed"
2407 );
2408 assert_eq!(agent.msg.messages.last().unwrap().role, Role::User);
2409 }
2410
2411 #[tokio::test]
2412 async fn load_history_removes_leading_orphan_tool_result() {
2413 use zeph_llm::provider::MessagePart;
2414
2415 let provider = mock_provider(vec![]);
2416 let channel = MockChannel::new(vec![]);
2417 let registry = create_test_registry();
2418 let executor = MockToolExecutor::no_tools();
2419
2420 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2421 let cid = memory.sqlite().create_conversation().await.unwrap();
2422 let sqlite = memory.sqlite();
2423
2424 let result_parts = serde_json::to_string(&[MessagePart::ToolResult {
2426 tool_use_id: "call_missing".to_string(),
2427 content: "result data".to_string(),
2428 is_error: false,
2429 }])
2430 .unwrap();
2431 sqlite
2432 .save_message_with_parts(
2433 cid,
2434 "user",
2435 "[tool_result: call_missing]\nresult data",
2436 &result_parts,
2437 )
2438 .await
2439 .unwrap();
2440
2441 sqlite
2443 .save_message(cid, "assistant", "here is my response")
2444 .await
2445 .unwrap();
2446
2447 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2448 std::sync::Arc::new(memory),
2449 cid,
2450 50,
2451 5,
2452 100,
2453 );
2454
2455 let messages_before = agent.msg.messages.len();
2456 agent.load_history().await.unwrap();
2457
2458 assert_eq!(
2460 agent.msg.messages.len(),
2461 messages_before + 1,
2462 "orphaned leading tool_result must be removed"
2463 );
2464 assert_eq!(agent.msg.messages.last().unwrap().role, Role::Assistant);
2465 }
2466
2467 #[tokio::test]
2468 async fn load_history_preserves_complete_tool_pairs() {
2469 use zeph_llm::provider::MessagePart;
2470
2471 let provider = mock_provider(vec![]);
2472 let channel = MockChannel::new(vec![]);
2473 let registry = create_test_registry();
2474 let executor = MockToolExecutor::no_tools();
2475
2476 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2477 let cid = memory.sqlite().create_conversation().await.unwrap();
2478 let sqlite = memory.sqlite();
2479
2480 let use_parts = serde_json::to_string(&[MessagePart::ToolUse {
2482 id: "call_ok".to_string(),
2483 name: "shell".to_string(),
2484 input: serde_json::json!({"command": "pwd"}),
2485 }])
2486 .unwrap();
2487 sqlite
2488 .save_message_with_parts(cid, "assistant", "[tool_use: shell(call_ok)]", &use_parts)
2489 .await
2490 .unwrap();
2491
2492 let result_parts = serde_json::to_string(&[MessagePart::ToolResult {
2493 tool_use_id: "call_ok".to_string(),
2494 content: "/home/user".to_string(),
2495 is_error: false,
2496 }])
2497 .unwrap();
2498 sqlite
2499 .save_message_with_parts(
2500 cid,
2501 "user",
2502 "[tool_result: call_ok]\n/home/user",
2503 &result_parts,
2504 )
2505 .await
2506 .unwrap();
2507
2508 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2509 std::sync::Arc::new(memory),
2510 cid,
2511 50,
2512 5,
2513 100,
2514 );
2515
2516 let messages_before = agent.msg.messages.len();
2517 agent.load_history().await.unwrap();
2518
2519 assert_eq!(
2521 agent.msg.messages.len(),
2522 messages_before + 2,
2523 "complete tool_use/tool_result pair must be preserved"
2524 );
2525 assert_eq!(agent.msg.messages[messages_before].role, Role::Assistant);
2526 assert_eq!(agent.msg.messages[messages_before + 1].role, Role::User);
2527 }
2528
2529 #[tokio::test]
2530 async fn load_history_handles_multiple_trailing_orphans() {
2531 use zeph_llm::provider::MessagePart;
2532
2533 let provider = mock_provider(vec![]);
2534 let channel = MockChannel::new(vec![]);
2535 let registry = create_test_registry();
2536 let executor = MockToolExecutor::no_tools();
2537
2538 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2539 let cid = memory.sqlite().create_conversation().await.unwrap();
2540 let sqlite = memory.sqlite();
2541
2542 sqlite.save_message(cid, "user", "start").await.unwrap();
2544
2545 let parts1 = serde_json::to_string(&[MessagePart::ToolUse {
2547 id: "call_1".to_string(),
2548 name: "shell".to_string(),
2549 input: serde_json::json!({}),
2550 }])
2551 .unwrap();
2552 sqlite
2553 .save_message_with_parts(cid, "assistant", "[tool_use: shell(call_1)]", &parts1)
2554 .await
2555 .unwrap();
2556
2557 let parts2 = serde_json::to_string(&[MessagePart::ToolUse {
2559 id: "call_2".to_string(),
2560 name: "read_file".to_string(),
2561 input: serde_json::json!({}),
2562 }])
2563 .unwrap();
2564 sqlite
2565 .save_message_with_parts(cid, "assistant", "[tool_use: read_file(call_2)]", &parts2)
2566 .await
2567 .unwrap();
2568
2569 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2570 std::sync::Arc::new(memory),
2571 cid,
2572 50,
2573 5,
2574 100,
2575 );
2576
2577 let messages_before = agent.msg.messages.len();
2578 agent.load_history().await.unwrap();
2579
2580 assert_eq!(
2582 agent.msg.messages.len(),
2583 messages_before + 1,
2584 "all trailing orphaned tool_use messages must be removed"
2585 );
2586 assert_eq!(agent.msg.messages.last().unwrap().role, Role::User);
2587 }
2588
2589 #[tokio::test]
2590 async fn load_history_no_tool_messages_unchanged() {
2591 let provider = mock_provider(vec![]);
2592 let channel = MockChannel::new(vec![]);
2593 let registry = create_test_registry();
2594 let executor = MockToolExecutor::no_tools();
2595
2596 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2597 let cid = memory.sqlite().create_conversation().await.unwrap();
2598 let sqlite = memory.sqlite();
2599
2600 sqlite.save_message(cid, "user", "hello").await.unwrap();
2601 sqlite
2602 .save_message(cid, "assistant", "hi there")
2603 .await
2604 .unwrap();
2605 sqlite
2606 .save_message(cid, "user", "how are you?")
2607 .await
2608 .unwrap();
2609
2610 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2611 std::sync::Arc::new(memory),
2612 cid,
2613 50,
2614 5,
2615 100,
2616 );
2617
2618 let messages_before = agent.msg.messages.len();
2619 agent.load_history().await.unwrap();
2620
2621 assert_eq!(
2623 agent.msg.messages.len(),
2624 messages_before + 3,
2625 "plain messages without tool parts must pass through unchanged"
2626 );
2627 }
2628
2629 #[tokio::test]
2630 async fn load_history_removes_both_leading_and_trailing_orphans() {
2631 use zeph_llm::provider::MessagePart;
2632
2633 let provider = mock_provider(vec![]);
2634 let channel = MockChannel::new(vec![]);
2635 let registry = create_test_registry();
2636 let executor = MockToolExecutor::no_tools();
2637
2638 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2639 let cid = memory.sqlite().create_conversation().await.unwrap();
2640 let sqlite = memory.sqlite();
2641
2642 let result_parts = serde_json::to_string(&[MessagePart::ToolResult {
2644 tool_use_id: "call_leading".to_string(),
2645 content: "orphaned result".to_string(),
2646 is_error: false,
2647 }])
2648 .unwrap();
2649 sqlite
2650 .save_message_with_parts(
2651 cid,
2652 "user",
2653 "[tool_result: call_leading]\norphaned result",
2654 &result_parts,
2655 )
2656 .await
2657 .unwrap();
2658
2659 sqlite
2661 .save_message(cid, "user", "what is 2+2?")
2662 .await
2663 .unwrap();
2664 sqlite.save_message(cid, "assistant", "4").await.unwrap();
2665
2666 let use_parts = serde_json::to_string(&[MessagePart::ToolUse {
2668 id: "call_trailing".to_string(),
2669 name: "shell".to_string(),
2670 input: serde_json::json!({"command": "date"}),
2671 }])
2672 .unwrap();
2673 sqlite
2674 .save_message_with_parts(
2675 cid,
2676 "assistant",
2677 "[tool_use: shell(call_trailing)]",
2678 &use_parts,
2679 )
2680 .await
2681 .unwrap();
2682
2683 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2684 std::sync::Arc::new(memory),
2685 cid,
2686 50,
2687 5,
2688 100,
2689 );
2690
2691 let messages_before = agent.msg.messages.len();
2692 agent.load_history().await.unwrap();
2693
2694 assert_eq!(
2696 agent.msg.messages.len(),
2697 messages_before + 2,
2698 "both leading and trailing orphans must be removed"
2699 );
2700 assert_eq!(agent.msg.messages[messages_before].role, Role::User);
2701 assert_eq!(agent.msg.messages[messages_before].content, "what is 2+2?");
2702 assert_eq!(
2703 agent.msg.messages[messages_before + 1].role,
2704 Role::Assistant
2705 );
2706 assert_eq!(agent.msg.messages[messages_before + 1].content, "4");
2707 }
2708
2709 #[tokio::test]
2714 async fn sanitize_tool_pairs_strips_mid_history_orphan_tool_use() {
2715 use zeph_llm::provider::MessagePart;
2716
2717 let provider = mock_provider(vec![]);
2718 let channel = MockChannel::new(vec![]);
2719 let registry = create_test_registry();
2720 let executor = MockToolExecutor::no_tools();
2721
2722 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2723 let cid = memory.sqlite().create_conversation().await.unwrap();
2724 let sqlite = memory.sqlite();
2725
2726 sqlite
2728 .save_message(cid, "user", "first question")
2729 .await
2730 .unwrap();
2731 sqlite
2732 .save_message(cid, "assistant", "first answer")
2733 .await
2734 .unwrap();
2735
2736 let use_parts = serde_json::to_string(&[
2740 MessagePart::ToolUse {
2741 id: "call_mid_1".to_string(),
2742 name: "shell".to_string(),
2743 input: serde_json::json!({"command": "ls"}),
2744 },
2745 MessagePart::Text {
2746 text: "Let me check the files.".to_string(),
2747 },
2748 ])
2749 .unwrap();
2750 sqlite
2751 .save_message_with_parts(cid, "assistant", "Let me check the files.", &use_parts)
2752 .await
2753 .unwrap();
2754
2755 sqlite
2757 .save_message(cid, "user", "second question")
2758 .await
2759 .unwrap();
2760 sqlite
2761 .save_message(cid, "assistant", "second answer")
2762 .await
2763 .unwrap();
2764
2765 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2766 std::sync::Arc::new(memory),
2767 cid,
2768 50,
2769 5,
2770 100,
2771 );
2772
2773 let messages_before = agent.msg.messages.len();
2774 agent.load_history().await.unwrap();
2775
2776 assert_eq!(
2779 agent.msg.messages.len(),
2780 messages_before + 5,
2781 "message count must be 5 (orphan message kept — has text content)"
2782 );
2783
2784 let orphan = &agent.msg.messages[messages_before + 2];
2786 assert_eq!(orphan.role, Role::Assistant);
2787 assert!(
2788 !orphan
2789 .parts
2790 .iter()
2791 .any(|p| matches!(p, MessagePart::ToolUse { .. })),
2792 "orphaned ToolUse parts must be stripped from mid-history message"
2793 );
2794 assert!(
2796 orphan.parts.iter().any(
2797 |p| matches!(p, MessagePart::Text { text } if text == "Let me check the files.")
2798 ),
2799 "text content of orphaned assistant message must be preserved"
2800 );
2801 }
2802
2803 #[tokio::test]
2808 async fn load_history_keeps_tool_only_user_message() {
2809 use zeph_llm::provider::MessagePart;
2810
2811 let provider = mock_provider(vec![]);
2812 let channel = MockChannel::new(vec![]);
2813 let registry = create_test_registry();
2814 let executor = MockToolExecutor::no_tools();
2815
2816 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2817 let cid = memory.sqlite().create_conversation().await.unwrap();
2818 let sqlite = memory.sqlite();
2819
2820 let use_parts = serde_json::to_string(&[MessagePart::ToolUse {
2822 id: "call_rc3".to_string(),
2823 name: "memory_save".to_string(),
2824 input: serde_json::json!({"content": "something"}),
2825 }])
2826 .unwrap();
2827 sqlite
2828 .save_message_with_parts(cid, "assistant", "[tool_use: memory_save]", &use_parts)
2829 .await
2830 .unwrap();
2831
2832 let result_parts = serde_json::to_string(&[MessagePart::ToolResult {
2834 tool_use_id: "call_rc3".to_string(),
2835 content: "saved".to_string(),
2836 is_error: false,
2837 }])
2838 .unwrap();
2839 sqlite
2840 .save_message_with_parts(cid, "user", "", &result_parts)
2841 .await
2842 .unwrap();
2843
2844 sqlite.save_message(cid, "assistant", "done").await.unwrap();
2845
2846 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2847 std::sync::Arc::new(memory),
2848 cid,
2849 50,
2850 5,
2851 100,
2852 );
2853
2854 let messages_before = agent.msg.messages.len();
2855 agent.load_history().await.unwrap();
2856
2857 assert_eq!(
2860 agent.msg.messages.len(),
2861 messages_before + 3,
2862 "user message with empty content but ToolResult parts must not be dropped"
2863 );
2864
2865 let user_msg = &agent.msg.messages[messages_before + 1];
2867 assert_eq!(user_msg.role, Role::User);
2868 assert!(
2869 user_msg.parts.iter().any(
2870 |p| matches!(p, MessagePart::ToolResult { tool_use_id, .. } if tool_use_id == "call_rc3")
2871 ),
2872 "ToolResult part must be preserved on user message with empty content"
2873 );
2874 }
2875
2876 #[tokio::test]
2880 async fn strip_orphans_removes_orphaned_tool_result() {
2881 use zeph_llm::provider::MessagePart;
2882
2883 let provider = mock_provider(vec![]);
2884 let channel = MockChannel::new(vec![]);
2885 let registry = create_test_registry();
2886 let executor = MockToolExecutor::no_tools();
2887
2888 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2889 let cid = memory.sqlite().create_conversation().await.unwrap();
2890 let sqlite = memory.sqlite();
2891
2892 sqlite.save_message(cid, "user", "hello").await.unwrap();
2894 sqlite.save_message(cid, "assistant", "hi").await.unwrap();
2895
2896 sqlite
2898 .save_message(cid, "assistant", "plain answer")
2899 .await
2900 .unwrap();
2901
2902 let orphan_result_parts = serde_json::to_string(&[MessagePart::ToolResult {
2904 tool_use_id: "call_nonexistent".to_string(),
2905 content: "stale result".to_string(),
2906 is_error: false,
2907 }])
2908 .unwrap();
2909 sqlite
2910 .save_message_with_parts(
2911 cid,
2912 "user",
2913 "[tool_result: call_nonexistent]\nstale result",
2914 &orphan_result_parts,
2915 )
2916 .await
2917 .unwrap();
2918
2919 sqlite
2920 .save_message(cid, "assistant", "final")
2921 .await
2922 .unwrap();
2923
2924 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2925 std::sync::Arc::new(memory),
2926 cid,
2927 50,
2928 5,
2929 100,
2930 );
2931
2932 let messages_before = agent.msg.messages.len();
2933 agent.load_history().await.unwrap();
2934
2935 let loaded = &agent.msg.messages[messages_before..];
2939 for msg in loaded {
2940 assert!(
2941 !msg.parts.iter().any(|p| matches!(
2942 p,
2943 MessagePart::ToolResult { tool_use_id, .. } if tool_use_id == "call_nonexistent"
2944 )),
2945 "orphaned ToolResult part must be stripped from history"
2946 );
2947 }
2948 }
2949
2950 #[tokio::test]
2953 async fn strip_orphans_keeps_complete_pair() {
2954 use zeph_llm::provider::MessagePart;
2955
2956 let provider = mock_provider(vec![]);
2957 let channel = MockChannel::new(vec![]);
2958 let registry = create_test_registry();
2959 let executor = MockToolExecutor::no_tools();
2960
2961 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2962 let cid = memory.sqlite().create_conversation().await.unwrap();
2963 let sqlite = memory.sqlite();
2964
2965 let use_parts = serde_json::to_string(&[MessagePart::ToolUse {
2966 id: "call_valid".to_string(),
2967 name: "shell".to_string(),
2968 input: serde_json::json!({"command": "ls"}),
2969 }])
2970 .unwrap();
2971 sqlite
2972 .save_message_with_parts(cid, "assistant", "[tool_use: shell]", &use_parts)
2973 .await
2974 .unwrap();
2975
2976 let result_parts = serde_json::to_string(&[MessagePart::ToolResult {
2977 tool_use_id: "call_valid".to_string(),
2978 content: "file.rs".to_string(),
2979 is_error: false,
2980 }])
2981 .unwrap();
2982 sqlite
2983 .save_message_with_parts(cid, "user", "", &result_parts)
2984 .await
2985 .unwrap();
2986
2987 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2988 std::sync::Arc::new(memory),
2989 cid,
2990 50,
2991 5,
2992 100,
2993 );
2994
2995 let messages_before = agent.msg.messages.len();
2996 agent.load_history().await.unwrap();
2997
2998 assert_eq!(
2999 agent.msg.messages.len(),
3000 messages_before + 2,
3001 "complete tool_use/tool_result pair must be preserved"
3002 );
3003
3004 let user_msg = &agent.msg.messages[messages_before + 1];
3005 assert!(
3006 user_msg.parts.iter().any(|p| matches!(
3007 p,
3008 MessagePart::ToolResult { tool_use_id, .. } if tool_use_id == "call_valid"
3009 )),
3010 "ToolResult part for a matched tool_use must not be stripped"
3011 );
3012 }
3013
3014 #[tokio::test]
3017 async fn strip_orphans_mixed_history() {
3018 use zeph_llm::provider::MessagePart;
3019
3020 let provider = mock_provider(vec![]);
3021 let channel = MockChannel::new(vec![]);
3022 let registry = create_test_registry();
3023 let executor = MockToolExecutor::no_tools();
3024
3025 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
3026 let cid = memory.sqlite().create_conversation().await.unwrap();
3027 let sqlite = memory.sqlite();
3028
3029 let use_parts_ok = serde_json::to_string(&[MessagePart::ToolUse {
3031 id: "call_good".to_string(),
3032 name: "shell".to_string(),
3033 input: serde_json::json!({"command": "pwd"}),
3034 }])
3035 .unwrap();
3036 sqlite
3037 .save_message_with_parts(cid, "assistant", "[tool_use: shell]", &use_parts_ok)
3038 .await
3039 .unwrap();
3040
3041 let result_parts_ok = serde_json::to_string(&[MessagePart::ToolResult {
3042 tool_use_id: "call_good".to_string(),
3043 content: "/home".to_string(),
3044 is_error: false,
3045 }])
3046 .unwrap();
3047 sqlite
3048 .save_message_with_parts(cid, "user", "", &result_parts_ok)
3049 .await
3050 .unwrap();
3051
3052 sqlite
3054 .save_message(cid, "assistant", "text only")
3055 .await
3056 .unwrap();
3057
3058 let orphan_parts = serde_json::to_string(&[MessagePart::ToolResult {
3059 tool_use_id: "call_ghost".to_string(),
3060 content: "ghost result".to_string(),
3061 is_error: false,
3062 }])
3063 .unwrap();
3064 sqlite
3065 .save_message_with_parts(
3066 cid,
3067 "user",
3068 "[tool_result: call_ghost]\nghost result",
3069 &orphan_parts,
3070 )
3071 .await
3072 .unwrap();
3073
3074 sqlite
3075 .save_message(cid, "assistant", "final reply")
3076 .await
3077 .unwrap();
3078
3079 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
3080 std::sync::Arc::new(memory),
3081 cid,
3082 50,
3083 5,
3084 100,
3085 );
3086
3087 let messages_before = agent.msg.messages.len();
3088 agent.load_history().await.unwrap();
3089
3090 let loaded = &agent.msg.messages[messages_before..];
3091
3092 for msg in loaded {
3094 assert!(
3095 !msg.parts.iter().any(|p| matches!(
3096 p,
3097 MessagePart::ToolResult { tool_use_id, .. } if tool_use_id == "call_ghost"
3098 )),
3099 "orphaned ToolResult (call_ghost) must be stripped from history"
3100 );
3101 }
3102
3103 let has_good_result = loaded.iter().any(|msg| {
3106 msg.role == Role::User
3107 && msg.parts.iter().any(|p| {
3108 matches!(
3109 p,
3110 MessagePart::ToolResult { tool_use_id, .. } if tool_use_id == "call_good"
3111 )
3112 })
3113 });
3114 assert!(
3115 has_good_result,
3116 "matched ToolResult (call_good) must be preserved in history"
3117 );
3118 }
3119
3120 #[tokio::test]
3123 async fn sanitize_tool_pairs_preserves_matched_tool_pair() {
3124 use zeph_llm::provider::MessagePart;
3125
3126 let provider = mock_provider(vec![]);
3127 let channel = MockChannel::new(vec![]);
3128 let registry = create_test_registry();
3129 let executor = MockToolExecutor::no_tools();
3130
3131 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
3132 let cid = memory.sqlite().create_conversation().await.unwrap();
3133 let sqlite = memory.sqlite();
3134
3135 sqlite
3136 .save_message(cid, "user", "run a command")
3137 .await
3138 .unwrap();
3139
3140 let use_parts = serde_json::to_string(&[MessagePart::ToolUse {
3142 id: "call_ok".to_string(),
3143 name: "shell".to_string(),
3144 input: serde_json::json!({"command": "echo hi"}),
3145 }])
3146 .unwrap();
3147 sqlite
3148 .save_message_with_parts(cid, "assistant", "[tool_use: shell]", &use_parts)
3149 .await
3150 .unwrap();
3151
3152 let result_parts = serde_json::to_string(&[MessagePart::ToolResult {
3154 tool_use_id: "call_ok".to_string(),
3155 content: "hi".to_string(),
3156 is_error: false,
3157 }])
3158 .unwrap();
3159 sqlite
3160 .save_message_with_parts(cid, "user", "[tool_result: call_ok]\nhi", &result_parts)
3161 .await
3162 .unwrap();
3163
3164 sqlite.save_message(cid, "assistant", "done").await.unwrap();
3165
3166 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
3167 std::sync::Arc::new(memory),
3168 cid,
3169 50,
3170 5,
3171 100,
3172 );
3173
3174 let messages_before = agent.msg.messages.len();
3175 agent.load_history().await.unwrap();
3176
3177 assert_eq!(
3179 agent.msg.messages.len(),
3180 messages_before + 4,
3181 "matched tool pair must not be removed"
3182 );
3183 let tool_msg = &agent.msg.messages[messages_before + 1];
3184 assert!(
3185 tool_msg
3186 .parts
3187 .iter()
3188 .any(|p| matches!(p, MessagePart::ToolUse { id, .. } if id == "call_ok")),
3189 "matched ToolUse parts must be preserved"
3190 );
3191 }
3192
3193 #[tokio::test]
3197 async fn persist_cancelled_tool_results_pairs_tool_use() {
3198 use zeph_llm::provider::MessagePart;
3199
3200 let provider = mock_provider(vec![]);
3201 let channel = MockChannel::new(vec![]);
3202 let registry = create_test_registry();
3203 let executor = MockToolExecutor::no_tools();
3204
3205 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
3206 let cid = memory.sqlite().create_conversation().await.unwrap();
3207
3208 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
3209 std::sync::Arc::new(memory),
3210 cid,
3211 50,
3212 5,
3213 100,
3214 );
3215
3216 let tool_calls = vec![
3218 zeph_llm::provider::ToolUseRequest {
3219 id: "cancel_id_1".to_string(),
3220 name: "shell".to_string().into(),
3221 input: serde_json::json!({}),
3222 },
3223 zeph_llm::provider::ToolUseRequest {
3224 id: "cancel_id_2".to_string(),
3225 name: "read_file".to_string().into(),
3226 input: serde_json::json!({}),
3227 },
3228 ];
3229
3230 agent.persist_cancelled_tool_results(&tool_calls).await;
3231
3232 let history = agent
3233 .memory_state
3234 .persistence
3235 .memory
3236 .as_ref()
3237 .unwrap()
3238 .sqlite()
3239 .load_history(cid, 50)
3240 .await
3241 .unwrap();
3242
3243 assert_eq!(history.len(), 1);
3245 assert_eq!(history[0].role, Role::User);
3246
3247 for tc in &tool_calls {
3249 assert!(
3250 history[0].parts.iter().any(|p| matches!(
3251 p,
3252 MessagePart::ToolResult { tool_use_id, is_error, .. }
3253 if tool_use_id == &tc.id && *is_error
3254 )),
3255 "tombstone ToolResult for {} must be present and is_error=true",
3256 tc.id
3257 );
3258 }
3259 }
3260
3261 #[test]
3264 fn meaningful_content_empty_string() {
3265 assert!(!has_meaningful_content(""));
3266 }
3267
3268 #[test]
3269 fn meaningful_content_whitespace_only() {
3270 assert!(!has_meaningful_content(" \n\t "));
3271 }
3272
3273 #[test]
3274 fn meaningful_content_tool_use_only() {
3275 assert!(!has_meaningful_content("[tool_use: shell(call_1)]"));
3276 }
3277
3278 #[test]
3279 fn meaningful_content_tool_use_no_parens() {
3280 assert!(!has_meaningful_content("[tool_use: memory_save]"));
3282 }
3283
3284 #[test]
3285 fn meaningful_content_tool_result_with_body() {
3286 assert!(!has_meaningful_content(
3287 "[tool_result: call_1]\nsome output here"
3288 ));
3289 }
3290
3291 #[test]
3292 fn meaningful_content_tool_result_empty_body() {
3293 assert!(!has_meaningful_content("[tool_result: call_1]\n"));
3294 }
3295
3296 #[test]
3297 fn meaningful_content_tool_output_inline() {
3298 assert!(!has_meaningful_content("[tool output: bash] some result"));
3299 }
3300
3301 #[test]
3302 fn meaningful_content_tool_output_pruned() {
3303 assert!(!has_meaningful_content("[tool output: bash] (pruned)"));
3304 }
3305
3306 #[test]
3307 fn meaningful_content_tool_output_fenced() {
3308 assert!(!has_meaningful_content(
3309 "[tool output: bash]\n```\nls output\n```"
3310 ));
3311 }
3312
3313 #[test]
3314 fn meaningful_content_multiple_tool_use_tags() {
3315 assert!(!has_meaningful_content(
3316 "[tool_use: bash(id1)][tool_use: read(id2)]"
3317 ));
3318 }
3319
3320 #[test]
3321 fn meaningful_content_multiple_tool_use_tags_space_separator() {
3322 assert!(!has_meaningful_content(
3324 "[tool_use: bash(id1)] [tool_use: read(id2)]"
3325 ));
3326 }
3327
3328 #[test]
3329 fn meaningful_content_multiple_tool_use_tags_newline_separator() {
3330 assert!(!has_meaningful_content(
3332 "[tool_use: bash(id1)]\n[tool_use: read(id2)]"
3333 ));
3334 }
3335
3336 #[test]
3337 fn meaningful_content_tool_result_followed_by_tool_use() {
3338 assert!(!has_meaningful_content(
3340 "[tool_result: call_1]\nresult\n[tool_use: bash(call_2)]"
3341 ));
3342 }
3343
3344 #[test]
3345 fn meaningful_content_real_text_only() {
3346 assert!(has_meaningful_content("Hello, how can I help you?"));
3347 }
3348
3349 #[test]
3350 fn meaningful_content_text_before_tool_tag() {
3351 assert!(has_meaningful_content("Let me check. [tool_use: bash(id)]"));
3352 }
3353
3354 #[test]
3355 fn meaningful_content_text_after_tool_use_tag() {
3356 assert!(has_meaningful_content("[tool_use: bash] I ran the command"));
3360 }
3361
3362 #[test]
3363 fn meaningful_content_text_between_tags() {
3364 assert!(has_meaningful_content(
3365 "[tool_use: bash(id1)]\nand then\n[tool_use: read(id2)]"
3366 ));
3367 }
3368
3369 #[test]
3370 fn meaningful_content_malformed_tag_no_closing_bracket() {
3371 assert!(has_meaningful_content("[tool_use: "));
3373 }
3374
3375 #[test]
3376 fn meaningful_content_tool_use_and_tool_result_only() {
3377 assert!(!has_meaningful_content(
3379 "[tool_use: memory_save(call_abc)]\n[tool_result: call_abc]\nsaved"
3380 ));
3381 }
3382
3383 #[test]
3384 fn meaningful_content_tool_result_body_with_json_array() {
3385 assert!(!has_meaningful_content(
3386 "[tool_result: id1]\n[\"array\", \"value\"]"
3387 ));
3388 }
3389
3390 #[tokio::test]
3401 async fn issue_2529_orphaned_legacy_content_pair_is_soft_deleted() {
3402 use zeph_llm::provider::MessagePart;
3403
3404 let provider = mock_provider(vec![]);
3405 let channel = MockChannel::new(vec![]);
3406 let registry = create_test_registry();
3407 let executor = MockToolExecutor::no_tools();
3408
3409 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
3410 let cid = memory.sqlite().create_conversation().await.unwrap();
3411 let sqlite = memory.sqlite();
3412
3413 sqlite
3415 .save_message(cid, "user", "save this for me")
3416 .await
3417 .unwrap();
3418
3419 let use_parts = serde_json::to_string(&[MessagePart::ToolUse {
3422 id: "call_2529".to_string(),
3423 name: "memory_save".to_string(),
3424 input: serde_json::json!({"content": "save this"}),
3425 }])
3426 .unwrap();
3427 let orphan_assistant_id = sqlite
3428 .save_message_with_parts(
3429 cid,
3430 "assistant",
3431 "[tool_use: memory_save(call_2529)]",
3432 &use_parts,
3433 )
3434 .await
3435 .unwrap();
3436
3437 sqlite
3442 .save_message(cid, "assistant", "here is a plain reply")
3443 .await
3444 .unwrap();
3445
3446 let result_parts = serde_json::to_string(&[MessagePart::ToolResult {
3447 tool_use_id: "call_2529".to_string(),
3448 content: "saved".to_string(),
3449 is_error: false,
3450 }])
3451 .unwrap();
3452 let orphan_user_id = sqlite
3453 .save_message_with_parts(
3454 cid,
3455 "user",
3456 "[tool_result: call_2529]\nsaved",
3457 &result_parts,
3458 )
3459 .await
3460 .unwrap();
3461
3462 sqlite.save_message(cid, "assistant", "done").await.unwrap();
3464
3465 let memory_arc = std::sync::Arc::new(memory);
3466 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
3467 memory_arc.clone(),
3468 cid,
3469 50,
3470 5,
3471 100,
3472 );
3473
3474 agent.load_history().await.unwrap();
3475
3476 let assistant_deleted_count: Vec<i64> = zeph_db::query_scalar(
3479 "SELECT COUNT(*) FROM messages WHERE id = ? AND deleted_at IS NOT NULL",
3480 )
3481 .bind(orphan_assistant_id)
3482 .fetch_all(memory_arc.sqlite().pool())
3483 .await
3484 .unwrap();
3485
3486 let user_deleted_count: Vec<i64> = zeph_db::query_scalar(
3487 "SELECT COUNT(*) FROM messages WHERE id = ? AND deleted_at IS NOT NULL",
3488 )
3489 .bind(orphan_user_id)
3490 .fetch_all(memory_arc.sqlite().pool())
3491 .await
3492 .unwrap();
3493
3494 assert_eq!(
3495 assistant_deleted_count.first().copied().unwrap_or(0),
3496 1,
3497 "orphaned assistant[ToolUse] with legacy-only content must be soft-deleted (deleted_at IS NOT NULL)"
3498 );
3499 assert_eq!(
3500 user_deleted_count.first().copied().unwrap_or(0),
3501 1,
3502 "orphaned user[ToolResult] with legacy-only content must be soft-deleted (deleted_at IS NOT NULL)"
3503 );
3504 }
3505
3506 #[tokio::test]
3510 async fn issue_2529_soft_delete_is_idempotent_across_sessions() {
3511 use zeph_llm::provider::MessagePart;
3512
3513 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
3514 let cid = memory.sqlite().create_conversation().await.unwrap();
3515 let sqlite = memory.sqlite();
3516
3517 sqlite
3519 .save_message(cid, "user", "do something")
3520 .await
3521 .unwrap();
3522
3523 let use_parts = serde_json::to_string(&[MessagePart::ToolUse {
3525 id: "call_idem".to_string(),
3526 name: "shell".to_string(),
3527 input: serde_json::json!({"command": "ls"}),
3528 }])
3529 .unwrap();
3530 sqlite
3531 .save_message_with_parts(cid, "assistant", "[tool_use: shell(call_idem)]", &use_parts)
3532 .await
3533 .unwrap();
3534
3535 sqlite
3537 .save_message(cid, "assistant", "continuing")
3538 .await
3539 .unwrap();
3540
3541 let result_parts = serde_json::to_string(&[MessagePart::ToolResult {
3543 tool_use_id: "call_idem".to_string(),
3544 content: "output".to_string(),
3545 is_error: false,
3546 }])
3547 .unwrap();
3548 sqlite
3549 .save_message_with_parts(
3550 cid,
3551 "user",
3552 "[tool_result: call_idem]\noutput",
3553 &result_parts,
3554 )
3555 .await
3556 .unwrap();
3557
3558 sqlite
3559 .save_message(cid, "assistant", "final")
3560 .await
3561 .unwrap();
3562
3563 let memory_arc = std::sync::Arc::new(memory);
3564
3565 let mut agent1 = Agent::new(
3567 mock_provider(vec![]),
3568 MockChannel::new(vec![]),
3569 create_test_registry(),
3570 None,
3571 5,
3572 MockToolExecutor::no_tools(),
3573 )
3574 .with_memory(memory_arc.clone(), cid, 50, 5, 100);
3575 agent1.load_history().await.unwrap();
3576 let count_after_first = agent1.msg.messages.len();
3577
3578 let mut agent2 = Agent::new(
3581 mock_provider(vec![]),
3582 MockChannel::new(vec![]),
3583 create_test_registry(),
3584 None,
3585 5,
3586 MockToolExecutor::no_tools(),
3587 )
3588 .with_memory(memory_arc.clone(), cid, 50, 5, 100);
3589 agent2.load_history().await.unwrap();
3590 let count_after_second = agent2.msg.messages.len();
3591
3592 assert_eq!(
3594 count_after_first, count_after_second,
3595 "second load_history must load the same message count as the first (soft-deleted orphans excluded)"
3596 );
3597 }
3598
3599 #[tokio::test]
3603 async fn issue_2529_message_with_text_and_tool_tag_is_kept_after_part_strip() {
3604 use zeph_llm::provider::MessagePart;
3605
3606 let provider = mock_provider(vec![]);
3607 let channel = MockChannel::new(vec![]);
3608 let registry = create_test_registry();
3609 let executor = MockToolExecutor::no_tools();
3610
3611 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
3612 let cid = memory.sqlite().create_conversation().await.unwrap();
3613 let sqlite = memory.sqlite();
3614
3615 sqlite
3617 .save_message(cid, "user", "check the files")
3618 .await
3619 .unwrap();
3620
3621 let use_parts = serde_json::to_string(&[MessagePart::ToolUse {
3624 id: "call_mixed".to_string(),
3625 name: "shell".to_string(),
3626 input: serde_json::json!({"command": "ls"}),
3627 }])
3628 .unwrap();
3629 sqlite
3630 .save_message_with_parts(
3631 cid,
3632 "assistant",
3633 "Let me list the directory. [tool_use: shell(call_mixed)]",
3634 &use_parts,
3635 )
3636 .await
3637 .unwrap();
3638
3639 sqlite.save_message(cid, "user", "thanks").await.unwrap();
3641 sqlite
3642 .save_message(cid, "assistant", "you are welcome")
3643 .await
3644 .unwrap();
3645
3646 let memory_arc = std::sync::Arc::new(memory);
3647 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
3648 memory_arc.clone(),
3649 cid,
3650 50,
3651 5,
3652 100,
3653 );
3654
3655 let messages_before = agent.msg.messages.len();
3656 agent.load_history().await.unwrap();
3657
3658 assert_eq!(
3660 agent.msg.messages.len(),
3661 messages_before + 4,
3662 "assistant message with text + tool tag must not be removed after ToolUse strip"
3663 );
3664
3665 let mixed_msg = agent
3667 .msg
3668 .messages
3669 .iter()
3670 .find(|m| m.content.contains("Let me list the directory"))
3671 .expect("mixed-content assistant message must still be in history");
3672 assert!(
3673 !mixed_msg
3674 .parts
3675 .iter()
3676 .any(|p| matches!(p, MessagePart::ToolUse { .. })),
3677 "orphaned ToolUse parts must be stripped even when message has meaningful text"
3678 );
3679 assert_eq!(
3680 mixed_msg.content, "Let me list the directory. [tool_use: shell(call_mixed)]",
3681 "content field must be unchanged — only parts are stripped"
3682 );
3683 }
3684
3685 #[tokio::test]
3688 async fn persist_message_skipped_tool_result_does_not_embed() {
3689 use zeph_llm::provider::MessagePart;
3690
3691 let provider = mock_provider(vec![]);
3692 let channel = MockChannel::new(vec![]);
3693 let registry = create_test_registry();
3694 let executor = MockToolExecutor::no_tools();
3695
3696 let (tx, rx) = tokio::sync::watch::channel(MetricsSnapshot::default());
3697 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
3698 let cid = memory.sqlite().create_conversation().await.unwrap();
3699
3700 let mut agent = Agent::new(provider, channel, registry, None, 5, executor)
3701 .with_metrics(tx)
3702 .with_memory(std::sync::Arc::new(memory), cid, 50, 5, 100);
3703 agent.memory_state.persistence.autosave_assistant = true;
3704 agent.memory_state.persistence.autosave_min_length = 0;
3705
3706 let parts = vec![MessagePart::ToolResult {
3707 tool_use_id: "tu1".into(),
3708 content: "[skipped] bash tool was blocked by utility gate".into(),
3709 is_error: false,
3710 }];
3711
3712 agent
3713 .persist_message(
3714 Role::User,
3715 "[skipped] bash tool was blocked by utility gate",
3716 &parts,
3717 false,
3718 )
3719 .await;
3720
3721 assert_eq!(
3722 rx.borrow().embeddings_generated,
3723 0,
3724 "[skipped] ToolResult must not be embedded into Qdrant"
3725 );
3726 }
3727
3728 #[tokio::test]
3729 async fn persist_message_stopped_tool_result_does_not_embed() {
3730 use zeph_llm::provider::MessagePart;
3731
3732 let provider = mock_provider(vec![]);
3733 let channel = MockChannel::new(vec![]);
3734 let registry = create_test_registry();
3735 let executor = MockToolExecutor::no_tools();
3736
3737 let (tx, rx) = tokio::sync::watch::channel(MetricsSnapshot::default());
3738 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
3739 let cid = memory.sqlite().create_conversation().await.unwrap();
3740
3741 let mut agent = Agent::new(provider, channel, registry, None, 5, executor)
3742 .with_metrics(tx)
3743 .with_memory(std::sync::Arc::new(memory), cid, 50, 5, 100);
3744 agent.memory_state.persistence.autosave_assistant = true;
3745 agent.memory_state.persistence.autosave_min_length = 0;
3746
3747 let parts = vec![MessagePart::ToolResult {
3748 tool_use_id: "tu2".into(),
3749 content: "[stopped] execution limit reached".into(),
3750 is_error: false,
3751 }];
3752
3753 agent
3754 .persist_message(
3755 Role::User,
3756 "[stopped] execution limit reached",
3757 &parts,
3758 false,
3759 )
3760 .await;
3761
3762 assert_eq!(
3763 rx.borrow().embeddings_generated,
3764 0,
3765 "[stopped] ToolResult must not be embedded into Qdrant"
3766 );
3767 }
3768
3769 #[tokio::test]
3770 async fn persist_message_normal_tool_result_is_saved_not_blocked_by_guard() {
3771 use zeph_llm::provider::MessagePart;
3774
3775 let provider = mock_provider(vec![]);
3776 let channel = MockChannel::new(vec![]);
3777 let registry = create_test_registry();
3778 let executor = MockToolExecutor::no_tools();
3779
3780 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
3781 let cid = memory.sqlite().create_conversation().await.unwrap();
3782 let memory_arc = std::sync::Arc::new(memory);
3783
3784 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
3785 memory_arc.clone(),
3786 cid,
3787 50,
3788 5,
3789 100,
3790 );
3791 agent.memory_state.persistence.autosave_assistant = true;
3792 agent.memory_state.persistence.autosave_min_length = 0;
3793
3794 let content = "total 42\ndrwxr-xr-x 5 user group";
3795 let parts = vec![MessagePart::ToolResult {
3796 tool_use_id: "tu3".into(),
3797 content: content.into(),
3798 is_error: false,
3799 }];
3800
3801 agent
3802 .persist_message(Role::User, content, &parts, false)
3803 .await;
3804
3805 let history = memory_arc.sqlite().load_history(cid, 50).await.unwrap();
3807 assert_eq!(
3808 history.len(),
3809 1,
3810 "normal ToolResult must be saved to SQLite"
3811 );
3812 assert_eq!(history[0].content, content);
3813 }
3814
3815 #[test]
3820 fn trajectory_extraction_slice_bounds_messages() {
3821 let max_messages: usize = 20;
3823 let total_messages = 100usize;
3824
3825 let tail_start = total_messages.saturating_sub(max_messages);
3826 let window = total_messages - tail_start;
3827
3828 assert_eq!(
3829 window, 20,
3830 "slice should contain exactly max_messages items"
3831 );
3832 assert_eq!(tail_start, 80, "slice should start at len - max_messages");
3833 }
3834
3835 #[test]
3836 fn trajectory_extraction_slice_handles_few_messages() {
3837 let max_messages: usize = 20;
3838 let total_messages = 5usize;
3839
3840 let tail_start = total_messages.saturating_sub(max_messages);
3841 let window = total_messages - tail_start;
3842
3843 assert_eq!(window, 5, "should return all messages when fewer than max");
3844 assert_eq!(tail_start, 0, "slice should start from the beginning");
3845 }
3846
3847 #[tokio::test]
3853 async fn regression_3168_complete_tool_pair_survives_round_trip() {
3854 use zeph_llm::provider::MessagePart;
3855
3856 let provider = mock_provider(vec![]);
3857 let channel = MockChannel::new(vec![]);
3858 let registry = create_test_registry();
3859 let executor = MockToolExecutor::no_tools();
3860
3861 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
3862 let cid = memory.sqlite().create_conversation().await.unwrap();
3863 let sqlite = memory.sqlite();
3864
3865 let use_parts = serde_json::to_string(&[MessagePart::ToolUse {
3866 id: "r3168_call".to_string(),
3867 name: "shell".to_string(),
3868 input: serde_json::json!({"command": "echo hi"}),
3869 }])
3870 .unwrap();
3871 sqlite
3872 .save_message_with_parts(
3873 cid,
3874 "assistant",
3875 "[tool_use: shell(r3168_call)]",
3876 &use_parts,
3877 )
3878 .await
3879 .unwrap();
3880
3881 let result_parts = serde_json::to_string(&[MessagePart::ToolResult {
3882 tool_use_id: "r3168_call".to_string(),
3883 content: "[skipped]".to_string(),
3884 is_error: false,
3885 }])
3886 .unwrap();
3887 sqlite
3888 .save_message_with_parts(cid, "user", "[tool_result: r3168_call]", &result_parts)
3889 .await
3890 .unwrap();
3891
3892 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
3893 std::sync::Arc::new(memory),
3894 cid,
3895 50,
3896 5,
3897 100,
3898 );
3899
3900 let base = agent.msg.messages.len();
3901 agent.load_history().await.unwrap();
3902
3903 assert_eq!(
3904 agent.msg.messages.len(),
3905 base + 2,
3906 "both messages of the complete pair must survive load_history"
3907 );
3908
3909 let assistant_msg = agent
3910 .msg
3911 .messages
3912 .iter()
3913 .find(|m| m.role == Role::Assistant)
3914 .expect("assistant message missing after load_history");
3915 assert!(
3916 assistant_msg
3917 .parts
3918 .iter()
3919 .any(|p| matches!(p, MessagePart::ToolUse { id, .. } if id == "r3168_call")),
3920 "ToolUse part must be preserved in assistant message"
3921 );
3922
3923 let user_msg = agent
3924 .msg
3925 .messages
3926 .iter()
3927 .rev()
3928 .find(|m| m.role == Role::User)
3929 .expect("user message missing after load_history");
3930 assert!(
3931 user_msg.parts.iter().any(|p| matches!(p, MessagePart::ToolResult { tool_use_id, .. } if tool_use_id == "r3168_call")),
3932 "ToolResult part must be preserved in user message"
3933 );
3934 }
3935
3936 #[test]
3939 fn regression_3168_system_between_tool_pair_not_stripped() {
3940 use zeph_llm::provider::{MessageMetadata, MessagePart};
3941
3942 let tool_id = "call_sys_between".to_string();
3943
3944 let mut messages = vec![
3945 Message {
3946 role: Role::Assistant,
3947 content: "[tool_use: shell(call_sys_between)]".to_string(),
3948 parts: vec![MessagePart::ToolUse {
3949 id: tool_id.clone(),
3950 name: "shell".to_string(),
3951 input: serde_json::json!({"command": "ls"}),
3952 }],
3953 metadata: MessageMetadata::default(),
3954 },
3955 Message {
3956 role: Role::System,
3957 content: "system hint injected between tool calls".to_string(),
3958 parts: vec![],
3959 metadata: MessageMetadata::default(),
3960 },
3961 Message {
3962 role: Role::User,
3963 content: "[tool_result: call_sys_between]".to_string(),
3964 parts: vec![MessagePart::ToolResult {
3965 tool_use_id: tool_id.clone(),
3966 content: "output".to_string(),
3967 is_error: false,
3968 }],
3969 metadata: MessageMetadata::default(),
3970 },
3971 ];
3972
3973 let (removed, _db_ids) = strip_mid_history_orphans(&mut messages);
3974
3975 assert_eq!(
3976 removed, 0,
3977 "no messages should be removed when System sits between a matched tool_use/tool_result pair"
3978 );
3979 assert_eq!(messages.len(), 3, "all three messages must remain");
3980 assert!(
3981 messages[0]
3982 .parts
3983 .iter()
3984 .any(|p| matches!(p, MessagePart::ToolUse { id, .. } if id == &tool_id)),
3985 "ToolUse part must not be stripped from assistant message"
3986 );
3987 }
3988
3989 #[tokio::test]
3994 async fn regression_3168_corrupt_parts_row_skipped_on_load() {
3995 use zeph_llm::provider::MessagePart;
3996
3997 let provider = mock_provider(vec![]);
3998 let channel = MockChannel::new(vec![]);
3999 let registry = create_test_registry();
4000 let executor = MockToolExecutor::no_tools();
4001
4002 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
4003 let cid = memory.sqlite().create_conversation().await.unwrap();
4004 let sqlite = memory.sqlite();
4005
4006 sqlite
4010 .save_message_with_parts(cid, "assistant", "[tool_use: shell(corrupt)]", "[]")
4011 .await
4012 .unwrap();
4013
4014 let result_parts = serde_json::to_string(&[MessagePart::ToolResult {
4016 tool_use_id: "corrupt".to_string(),
4017 content: "result".to_string(),
4018 is_error: false,
4019 }])
4020 .unwrap();
4021 sqlite
4022 .save_message_with_parts(cid, "user", "[tool_result: corrupt]", &result_parts)
4023 .await
4024 .unwrap();
4025
4026 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
4027 std::sync::Arc::new(memory),
4028 cid,
4029 50,
4030 5,
4031 100,
4032 );
4033
4034 let base = agent.msg.messages.len();
4035 agent.load_history().await.unwrap();
4036
4037 let loaded = agent.msg.messages.len() - base;
4042 let orphan_present = agent.msg.messages.iter().any(|m| {
4045 m.role == Role::User
4046 && m.parts.iter().any(|p| {
4047 matches!(p, MessagePart::ToolResult { tool_use_id, .. } if tool_use_id == "corrupt")
4048 })
4049 });
4050 assert!(
4051 !orphan_present,
4052 "orphaned ToolResult must not survive load_history; loaded={loaded}"
4053 );
4054 }
4055}