1use std::collections::HashSet;
5
6use crate::channel::Channel;
7use zeph_llm::provider::{LlmProvider as _, Message, MessagePart, Role};
8use zeph_memory::store::role_str;
9
10use super::Agent;
11
12fn sanitize_tool_pairs(messages: &mut Vec<Message>) -> (usize, Vec<i64>) {
28 let mut removed = 0;
29 let mut db_ids: Vec<i64> = Vec::new();
30
31 loop {
32 if let Some(last) = messages.last()
34 && last.role == Role::Assistant
35 && last
36 .parts
37 .iter()
38 .any(|p| matches!(p, MessagePart::ToolUse { .. }))
39 {
40 let ids: Vec<String> = last
41 .parts
42 .iter()
43 .filter_map(|p| {
44 if let MessagePart::ToolUse { id, .. } = p {
45 Some(id.clone())
46 } else {
47 None
48 }
49 })
50 .collect();
51 tracing::warn!(
52 tool_ids = ?ids,
53 "removing orphaned trailing tool_use message from restored history"
54 );
55 if let Some(db_id) = messages.last().and_then(|m| m.metadata.db_id) {
56 db_ids.push(db_id);
57 }
58 messages.pop();
59 removed += 1;
60 continue;
61 }
62
63 if let Some(first) = messages.first()
65 && first.role == Role::User
66 && first
67 .parts
68 .iter()
69 .any(|p| matches!(p, MessagePart::ToolResult { .. }))
70 {
71 let ids: Vec<String> = first
72 .parts
73 .iter()
74 .filter_map(|p| {
75 if let MessagePart::ToolResult { tool_use_id, .. } = p {
76 Some(tool_use_id.clone())
77 } else {
78 None
79 }
80 })
81 .collect();
82 tracing::warn!(
83 tool_use_ids = ?ids,
84 "removing orphaned leading tool_result message from restored history"
85 );
86 if let Some(db_id) = messages.first().and_then(|m| m.metadata.db_id) {
87 db_ids.push(db_id);
88 }
89 messages.remove(0);
90 removed += 1;
91 continue;
92 }
93
94 break;
95 }
96
97 let (mid_removed, mid_db_ids) = strip_mid_history_orphans(messages);
100 removed += mid_removed;
101 db_ids.extend(mid_db_ids);
102
103 (removed, db_ids)
104}
105
106fn orphaned_tool_use_ids(msg: &Message, next_msg: Option<&Message>) -> HashSet<String> {
108 let matched: HashSet<String> = next_msg
109 .filter(|n| n.role == Role::User)
110 .map(|n| {
111 msg.parts
112 .iter()
113 .filter_map(|p| if let MessagePart::ToolUse { id, .. } = p { Some(id.clone()) } else { None })
114 .filter(|uid| n.parts.iter().any(|np| matches!(np, MessagePart::ToolResult { tool_use_id, .. } if tool_use_id == uid)))
115 .collect()
116 })
117 .unwrap_or_default();
118 msg.parts
119 .iter()
120 .filter_map(|p| {
121 if let MessagePart::ToolUse { id, .. } = p
122 && !matched.contains(id)
123 {
124 Some(id.clone())
125 } else {
126 None
127 }
128 })
129 .collect()
130}
131
132fn orphaned_tool_result_ids(msg: &Message, prev_msg: Option<&Message>) -> HashSet<String> {
134 let avail: HashSet<&str> = prev_msg
135 .filter(|p| p.role == Role::Assistant)
136 .map(|p| {
137 p.parts
138 .iter()
139 .filter_map(|part| {
140 if let MessagePart::ToolUse { id, .. } = part {
141 Some(id.as_str())
142 } else {
143 None
144 }
145 })
146 .collect()
147 })
148 .unwrap_or_default();
149 msg.parts
150 .iter()
151 .filter_map(|p| {
152 if let MessagePart::ToolResult { tool_use_id, .. } = p
153 && !avail.contains(tool_use_id.as_str())
154 {
155 Some(tool_use_id.clone())
156 } else {
157 None
158 }
159 })
160 .collect()
161}
162
163fn has_meaningful_content(content: &str) -> bool {
184 const PREFIXES: [&str; 3] = ["[tool_use: ", "[tool_result: ", "[tool output: "];
185
186 let mut remaining = content.trim();
187
188 loop {
189 let next = PREFIXES
191 .iter()
192 .filter_map(|prefix| remaining.find(prefix).map(|pos| (pos, *prefix)))
193 .min_by_key(|(pos, _)| *pos);
194
195 let Some((start, prefix)) = next else {
196 break;
198 };
199
200 if !remaining[..start].trim().is_empty() {
202 return true;
203 }
204
205 let after_prefix = &remaining[start + prefix.len()..];
207 let Some(close) = after_prefix.find(']') else {
208 return true;
210 };
211
212 let tag_end = start + prefix.len() + close + 1;
214
215 if prefix == "[tool_result: " || prefix == "[tool output: " {
216 let body = remaining[tag_end..].trim_start_matches('\n');
219 let next_tag = PREFIXES
220 .iter()
221 .filter_map(|p| body.find(p))
222 .min()
223 .unwrap_or(body.len());
224 remaining = &body[next_tag..];
225 } else {
226 remaining = &remaining[tag_end..];
227 }
228 }
229
230 !remaining.trim().is_empty()
231}
232
233fn strip_mid_history_orphans(messages: &mut Vec<Message>) -> (usize, Vec<i64>) {
246 let mut removed = 0;
247 let mut db_ids: Vec<i64> = Vec::new();
248 let mut i = 0;
249 while i < messages.len() {
250 if messages[i].role == Role::Assistant
254 && messages[i]
255 .parts
256 .iter()
257 .any(|p| matches!(p, MessagePart::ToolUse { .. }))
258 {
259 let next_non_system = (i + 1..messages.len())
260 .find(|&j| messages[j].role != Role::System)
261 .and_then(|j| messages.get(j));
262 let orphaned_ids = orphaned_tool_use_ids(&messages[i], next_non_system);
263 if !orphaned_ids.is_empty() {
264 tracing::warn!(
265 tool_ids = ?orphaned_ids,
266 index = i,
267 "stripping orphaned mid-history tool_use parts from assistant message"
268 );
269 messages[i].parts.retain(
270 |p| !matches!(p, MessagePart::ToolUse { id, .. } if orphaned_ids.contains(id)),
271 );
272 let is_empty =
273 !has_meaningful_content(&messages[i].content) && messages[i].parts.is_empty();
274 if is_empty {
275 if let Some(db_id) = messages[i].metadata.db_id {
276 db_ids.push(db_id);
277 }
278 messages.remove(i);
279 removed += 1;
280 continue; }
282 }
283 }
284
285 if messages[i].role == Role::User
287 && messages[i]
288 .parts
289 .iter()
290 .any(|p| matches!(p, MessagePart::ToolResult { .. }))
291 {
292 let prev_non_system = (0..i)
293 .rev()
294 .find(|&j| messages[j].role != Role::System)
295 .and_then(|j| messages.get(j));
296 let orphaned_ids = orphaned_tool_result_ids(&messages[i], prev_non_system);
297 if !orphaned_ids.is_empty() {
298 tracing::warn!(
299 tool_use_ids = ?orphaned_ids,
300 index = i,
301 "stripping orphaned mid-history tool_result parts from user message"
302 );
303 messages[i].parts.retain(|p| {
304 !matches!(p, MessagePart::ToolResult { tool_use_id, .. } if orphaned_ids.contains(tool_use_id.as_str()))
305 });
306
307 let is_empty =
308 !has_meaningful_content(&messages[i].content) && messages[i].parts.is_empty();
309 if is_empty {
310 if let Some(db_id) = messages[i].metadata.db_id {
311 db_ids.push(db_id);
312 }
313 messages.remove(i);
314 removed += 1;
315 continue;
317 }
318 }
319 }
320
321 i += 1;
322 }
323 (removed, db_ids)
324}
325
326impl<C: Channel> Agent<C> {
327 pub async fn load_history(&mut self) -> Result<(), super::error::AgentError> {
333 let (Some(memory), Some(cid)) = (
334 &self.memory_state.persistence.memory,
335 self.memory_state.persistence.conversation_id,
336 ) else {
337 return Ok(());
338 };
339
340 let history = memory
341 .sqlite()
342 .load_history_filtered(
343 cid,
344 self.memory_state.persistence.history_limit,
345 Some(true),
346 None,
347 )
348 .await?;
349 if !history.is_empty() {
350 let mut loaded = 0;
351 let mut skipped = 0;
352
353 for msg in history {
354 if !has_meaningful_content(&msg.content) && msg.parts.is_empty() {
359 tracing::warn!("skipping empty message from history (role: {:?})", msg.role);
360 skipped += 1;
361 continue;
362 }
363 self.msg.messages.push(msg);
364 loaded += 1;
365 }
366
367 let history_start = self.msg.messages.len() - loaded;
369 let mut restored_slice = self.msg.messages.split_off(history_start);
370 let (orphans, orphan_db_ids) = sanitize_tool_pairs(&mut restored_slice);
371 skipped += orphans;
372 loaded = loaded.saturating_sub(orphans);
373 self.msg.messages.append(&mut restored_slice);
374
375 if !orphan_db_ids.is_empty() {
376 let ids: Vec<zeph_memory::types::MessageId> = orphan_db_ids
377 .iter()
378 .map(|&id| zeph_memory::types::MessageId(id))
379 .collect();
380 if let Err(e) = memory.sqlite().soft_delete_messages(&ids).await {
381 tracing::warn!(
382 count = ids.len(),
383 error = %e,
384 "failed to soft-delete orphaned tool-pair messages from DB"
385 );
386 } else {
387 tracing::debug!(
388 count = ids.len(),
389 "soft-deleted orphaned tool-pair messages from DB"
390 );
391 }
392 }
393
394 tracing::info!("restored {loaded} message(s) from conversation {cid}");
395 if skipped > 0 {
396 tracing::warn!("skipped {skipped} empty/orphaned message(s) from history");
397 }
398
399 if loaded > 0 {
400 let _ = memory
403 .sqlite()
404 .increment_session_counts_for_conversation(cid)
405 .await
406 .inspect_err(|e| {
407 tracing::warn!(error = %e, "failed to increment tier session counts");
408 });
409 }
410 }
411
412 if let Ok(count) = memory.message_count(cid).await {
413 let count_u64 = u64::try_from(count).unwrap_or(0);
414 self.update_metrics(|m| {
415 m.sqlite_message_count = count_u64;
416 });
417 }
418
419 if let Ok(count) = memory.sqlite().count_semantic_facts().await {
420 let count_u64 = u64::try_from(count).unwrap_or(0);
421 self.update_metrics(|m| {
422 m.semantic_fact_count = count_u64;
423 });
424 }
425
426 if let Ok(count) = memory.unsummarized_message_count(cid).await {
427 self.memory_state.persistence.unsummarized_count = usize::try_from(count).unwrap_or(0);
428 }
429
430 self.recompute_prompt_tokens();
431 Ok(())
432 }
433
434 #[allow(clippy::too_many_lines)]
440 #[cfg_attr(
441 feature = "profiling",
442 tracing::instrument(name = "agent.persist_message", skip_all)
443 )]
444 pub(crate) async fn persist_message(
445 &mut self,
446 role: Role,
447 content: &str,
448 parts: &[MessagePart],
449 has_injection_flags: bool,
450 ) {
451 let (Some(memory), Some(cid)) = (
452 &self.memory_state.persistence.memory,
453 self.memory_state.persistence.conversation_id,
454 ) else {
455 return;
456 };
457
458 let parts_json = if parts.is_empty() {
459 "[]".to_string()
460 } else {
461 match serde_json::to_string(parts) {
462 Ok(json) => json,
463 Err(e) => {
464 tracing::error!(
465 role = ?role,
466 parts_count = parts.len(),
467 error = %e,
468 "failed to serialize message parts — skipping persist to avoid orphaned tool pair"
469 );
470 return;
471 }
472 }
473 };
474
475 let guard_event = self
479 .security
480 .exfiltration_guard
481 .should_guard_memory_write(has_injection_flags);
482 if let Some(ref event) = guard_event {
483 tracing::warn!(
484 ?event,
485 "exfiltration guard: skipping Qdrant embedding for flagged content"
486 );
487 self.update_metrics(|m| m.exfiltration_memory_guards += 1);
488 self.push_security_event(
489 crate::metrics::SecurityEventCategory::ExfiltrationBlock,
490 "memory_write",
491 "Qdrant embedding skipped: flagged content",
492 );
493 }
494
495 let skip_embedding = guard_event.is_some();
496
497 let has_skipped_tool_result = parts.iter().any(|p| {
501 if let MessagePart::ToolResult { content, .. } = p {
502 content.starts_with("[skipped]") || content.starts_with("[stopped]")
503 } else {
504 false
505 }
506 });
507
508 let should_embed = if skip_embedding || has_skipped_tool_result {
509 false
510 } else {
511 match role {
512 Role::Assistant => {
513 self.memory_state.persistence.autosave_assistant
514 && content.len() >= self.memory_state.persistence.autosave_min_length
515 }
516 _ => true,
517 }
518 };
519
520 let goal_text = self.memory_state.extraction.goal_text.clone();
521
522 tracing::debug!(
523 "persist_message: calling remember_with_parts, embed dispatched to background"
524 );
525 let (embedding_stored, was_persisted) = if should_embed {
526 match memory
527 .remember_with_parts(
528 cid,
529 role_str(role),
530 content,
531 &parts_json,
532 goal_text.as_deref(),
533 )
534 .await
535 {
536 Ok((Some(message_id), stored)) => {
537 self.msg.last_persisted_message_id = Some(message_id.0);
538 (stored, true)
539 }
540 Ok((None, _)) => {
541 return;
543 }
544 Err(e) => {
545 tracing::error!("failed to persist message: {e:#}");
546 return;
547 }
548 }
549 } else {
550 match memory
551 .save_only(cid, role_str(role), content, &parts_json)
552 .await
553 {
554 Ok(message_id) => {
555 self.msg.last_persisted_message_id = Some(message_id.0);
556 (false, true)
557 }
558 Err(e) => {
559 tracing::error!("failed to persist message: {e:#}");
560 return;
561 }
562 }
563 };
564
565 if !was_persisted {
566 return;
567 }
568
569 self.memory_state.persistence.unsummarized_count += 1;
570
571 self.update_metrics(|m| {
572 m.sqlite_message_count += 1;
573 if embedding_stored {
574 m.embeddings_generated += 1;
575 }
576 });
577
578 tracing::debug!("persist_message: db insert complete, embedding running in background");
579 memory.reap_embed_tasks();
580
581 self.enqueue_summarization_task();
585
586 let has_tool_result_parts = parts
589 .iter()
590 .any(|p| matches!(p, MessagePart::ToolResult { .. }));
591
592 self.enqueue_graph_extraction_task(content, has_injection_flags, has_tool_result_parts)
593 .await;
594
595 if role == Role::User && !has_tool_result_parts && !has_injection_flags {
597 self.enqueue_persona_extraction_task();
598 }
599
600 if has_tool_result_parts {
602 self.enqueue_trajectory_extraction_task();
603 }
604 }
605
606 fn enqueue_summarization_task(&mut self) {
608 let (Some(memory), Some(cid)) = (
609 self.memory_state.persistence.memory.clone(),
610 self.memory_state.persistence.conversation_id,
611 ) else {
612 return;
613 };
614
615 if self.memory_state.persistence.unsummarized_count
616 <= self.memory_state.compaction.summarization_threshold
617 {
618 return;
619 }
620
621 let batch_size = self.memory_state.compaction.summarization_threshold / 2;
622
623 self.lifecycle.supervisor.spawn_summarization("summarization", async move {
624 match tokio::time::timeout(
625 std::time::Duration::from_secs(30),
626 memory.summarize(cid, batch_size),
627 )
628 .await
629 {
630 Ok(Ok(Some(summary_id))) => {
631 tracing::info!(
632 "background summarization: created summary {summary_id} for conversation {cid}"
633 );
634 true
635 }
636 Ok(Ok(None)) => {
637 tracing::debug!("background summarization: no summarization needed");
638 false
639 }
640 Ok(Err(e)) => {
641 tracing::error!("background summarization failed: {e:#}");
642 false
643 }
644 Err(_) => {
645 tracing::warn!("background summarization timed out after 30s");
646 false
647 }
648 }
649 });
650 }
651
652 #[allow(clippy::too_many_lines)]
657 async fn enqueue_graph_extraction_task(
658 &mut self,
659 content: &str,
660 has_injection_flags: bool,
661 has_tool_result_parts: bool,
662 ) {
663 use zeph_memory::semantic::GraphExtractionConfig;
664
665 if self.memory_state.persistence.memory.is_none()
666 || self.memory_state.persistence.conversation_id.is_none()
667 {
668 return;
669 }
670 if has_tool_result_parts {
671 tracing::debug!("graph extraction skipped: message contains ToolResult parts");
672 return;
673 }
674 if has_injection_flags {
675 tracing::warn!("graph extraction skipped: injection patterns detected in content");
676 return;
677 }
678
679 let extraction_cfg = {
680 let cfg = &self.memory_state.extraction.graph_config;
681 if !cfg.enabled {
682 return;
683 }
684 GraphExtractionConfig {
685 max_entities: cfg.max_entities_per_message,
686 max_edges: cfg.max_edges_per_message,
687 extraction_timeout_secs: cfg.extraction_timeout_secs,
688 community_refresh_interval: cfg.community_refresh_interval,
689 expired_edge_retention_days: cfg.expired_edge_retention_days,
690 max_entities_cap: cfg.max_entities,
691 community_summary_max_prompt_bytes: cfg.community_summary_max_prompt_bytes,
692 community_summary_concurrency: cfg.community_summary_concurrency,
693 lpa_edge_chunk_size: cfg.lpa_edge_chunk_size,
694 note_linking: zeph_memory::NoteLinkingConfig {
695 enabled: cfg.note_linking.enabled,
696 similarity_threshold: cfg.note_linking.similarity_threshold,
697 top_k: cfg.note_linking.top_k,
698 timeout_secs: cfg.note_linking.timeout_secs,
699 },
700 link_weight_decay_lambda: cfg.link_weight_decay_lambda,
701 link_weight_decay_interval_secs: cfg.link_weight_decay_interval_secs,
702 belief_revision_enabled: cfg.belief_revision.enabled,
703 belief_revision_similarity_threshold: cfg.belief_revision.similarity_threshold,
704 conversation_id: self.memory_state.persistence.conversation_id.map(|c| c.0),
705 }
706 };
707
708 if self.rpe_should_skip(content).await {
711 tracing::debug!("D-MEM RPE: low-surprise turn, skipping graph extraction");
712 return;
713 }
714
715 let context_messages: Vec<String> = self
716 .msg
717 .messages
718 .iter()
719 .rev()
720 .filter(|m| {
721 m.role == Role::User
722 && !m
723 .parts
724 .iter()
725 .any(|p| matches!(p, MessagePart::ToolResult { .. }))
726 })
727 .take(4)
728 .map(|m| {
729 if m.content.len() > 2048 {
730 m.content[..m.content.floor_char_boundary(2048)].to_owned()
731 } else {
732 m.content.clone()
733 }
734 })
735 .collect();
736
737 let Some(memory) = self.memory_state.persistence.memory.clone() else {
738 return;
739 };
740
741 let validator: zeph_memory::semantic::PostExtractValidator =
742 if self.security.memory_validator.is_enabled() {
743 let v = self.security.memory_validator.clone();
744 Some(Box::new(move |result| {
745 v.validate_graph_extraction(result)
746 .map_err(|e| e.to_string())
747 }))
748 } else {
749 None
750 };
751
752 let content_owned = content.to_owned();
753 let graph_store = memory.graph_store.clone();
754 let metrics_tx = self.metrics.metrics_tx.clone();
755 let start_time = self.lifecycle.start_time;
756
757 self.lifecycle.supervisor.spawn(
758 super::agent_supervisor::TaskClass::Enrichment,
759 "graph_extraction",
760 async move {
761 let extraction_handle = memory.spawn_graph_extraction(
762 content_owned,
763 context_messages,
764 extraction_cfg,
765 validator,
766 );
767
768 if let (Some(store), Some(tx)) = (graph_store, metrics_tx) {
770 let _ = extraction_handle.await;
771 let (entities, edges, communities) = tokio::join!(
772 store.entity_count(),
773 store.active_edge_count(),
774 store.community_count()
775 );
776 let elapsed = start_time.elapsed().as_secs();
777 tx.send_modify(|m| {
778 m.uptime_seconds = elapsed;
779 m.graph_entities_total = entities.unwrap_or(0).cast_unsigned();
780 m.graph_edges_total = edges.unwrap_or(0).cast_unsigned();
781 m.graph_communities_total = communities.unwrap_or(0).cast_unsigned();
782 });
783 } else {
784 let _ = extraction_handle.await;
785 }
786
787 tracing::debug!("background graph extraction complete");
788 },
789 );
790
791 self.sync_community_detection_failures();
793 self.sync_graph_extraction_metrics();
794 let memory_for_sync = self.memory_state.persistence.memory.clone();
796 let metrics_tx_sync = self.metrics.metrics_tx.clone();
797 let start_time_sync = self.lifecycle.start_time;
798 let cid_sync = self.memory_state.persistence.conversation_id;
799 let graph_store_sync = memory_for_sync.as_ref().and_then(|m| m.graph_store.clone());
800 let sqlite_sync = memory_for_sync.as_ref().map(|m| m.sqlite().clone());
801 let guidelines_enabled = self.memory_state.extraction.graph_config.enabled;
802
803 self.lifecycle.supervisor.spawn(
804 super::agent_supervisor::TaskClass::Telemetry,
805 "graph_count_sync",
806 async move {
807 let Some(store) = graph_store_sync else {
808 return;
809 };
810 let Some(tx) = metrics_tx_sync else { return };
811
812 let (entities, edges, communities) = tokio::join!(
813 store.entity_count(),
814 store.active_edge_count(),
815 store.community_count()
816 );
817 let elapsed = start_time_sync.elapsed().as_secs();
818 tx.send_modify(|m| {
819 m.uptime_seconds = elapsed;
820 m.graph_entities_total = entities.unwrap_or(0).cast_unsigned();
821 m.graph_edges_total = edges.unwrap_or(0).cast_unsigned();
822 m.graph_communities_total = communities.unwrap_or(0).cast_unsigned();
823 });
824
825 if guidelines_enabled && let Some(sqlite) = sqlite_sync {
827 match tokio::time::timeout(
828 std::time::Duration::from_secs(10),
829 sqlite.load_compression_guidelines_meta(cid_sync),
830 )
831 .await
832 {
833 Ok(Ok((version, created_at))) => {
834 #[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
835 let version_u32 = u32::try_from(version).unwrap_or(0);
836 tx.send_modify(|m| {
837 m.guidelines_version = version_u32;
838 m.guidelines_updated_at = created_at;
839 });
840 }
841 Ok(Err(e)) => {
842 tracing::debug!("guidelines status sync failed: {e:#}");
843 }
844 Err(_) => {
845 tracing::debug!("guidelines status sync timed out");
846 }
847 }
848 }
849 },
850 );
851 }
852
853 fn enqueue_persona_extraction_task(&mut self) {
855 use zeph_memory::semantic::{PersonaExtractionConfig, extract_persona_facts};
856
857 let cfg = &self.memory_state.extraction.persona_config;
858 if !cfg.enabled {
859 return;
860 }
861
862 let Some(memory) = &self.memory_state.persistence.memory else {
863 return;
864 };
865
866 let user_messages: Vec<String> = self
867 .msg
868 .messages
869 .iter()
870 .filter(|m| {
871 m.role == Role::User
872 && !m
873 .parts
874 .iter()
875 .any(|p| matches!(p, MessagePart::ToolResult { .. }))
876 })
877 .take(8)
878 .map(|m| {
879 if m.content.len() > 2048 {
880 m.content[..m.content.floor_char_boundary(2048)].to_owned()
881 } else {
882 m.content.clone()
883 }
884 })
885 .collect();
886
887 if user_messages.len() < cfg.min_messages {
888 return;
889 }
890
891 let timeout_secs = cfg.extraction_timeout_secs;
892 let extraction_cfg = PersonaExtractionConfig {
893 enabled: cfg.enabled,
894 min_messages: cfg.min_messages,
895 max_messages: cfg.max_messages,
896 extraction_timeout_secs: timeout_secs,
897 };
898
899 let provider = self.resolve_background_provider(cfg.persona_provider.as_str());
900 let store = memory.sqlite().clone();
901 let conversation_id = self.memory_state.persistence.conversation_id.map(|c| c.0);
902
903 self.lifecycle.supervisor.spawn(
904 super::agent_supervisor::TaskClass::Enrichment,
905 "persona_extraction",
906 async move {
907 let user_message_refs: Vec<&str> =
908 user_messages.iter().map(String::as_str).collect();
909 let fut = extract_persona_facts(
910 &store,
911 &provider,
912 &user_message_refs,
913 &extraction_cfg,
914 conversation_id,
915 );
916 match tokio::time::timeout(std::time::Duration::from_secs(timeout_secs), fut).await
917 {
918 Ok(Ok(n)) => tracing::debug!(upserted = n, "persona extraction complete"),
919 Ok(Err(e)) => tracing::warn!(error = %e, "persona extraction failed"),
920 Err(_) => tracing::warn!(
921 timeout_secs,
922 "persona extraction timed out — no facts written this turn"
923 ),
924 }
925 },
926 );
927 }
928
929 fn enqueue_trajectory_extraction_task(&mut self) {
931 use zeph_memory::semantic::{TrajectoryExtractionConfig, extract_trajectory_entries};
932
933 let cfg = self.memory_state.extraction.trajectory_config.clone();
934 if !cfg.enabled {
935 return;
936 }
937
938 let Some(memory) = &self.memory_state.persistence.memory else {
939 return;
940 };
941
942 let conversation_id = match self.memory_state.persistence.conversation_id {
943 Some(cid) => cid.0,
944 None => return,
945 };
946
947 let tail_start = self.msg.messages.len().saturating_sub(cfg.max_messages);
948 let turn_messages: Vec<zeph_llm::provider::Message> =
949 self.msg.messages[tail_start..].to_vec();
950
951 if turn_messages.is_empty() {
952 return;
953 }
954
955 let extraction_cfg = TrajectoryExtractionConfig {
956 enabled: cfg.enabled,
957 max_messages: cfg.max_messages,
958 extraction_timeout_secs: cfg.extraction_timeout_secs,
959 };
960
961 let provider = self.resolve_background_provider(cfg.trajectory_provider.as_str());
962 let store = memory.sqlite().clone();
963 let min_confidence = cfg.min_confidence;
964
965 self.lifecycle.supervisor.spawn(
966 super::agent_supervisor::TaskClass::Enrichment,
967 "trajectory_extraction",
968 async move {
969 let entries =
970 match extract_trajectory_entries(&provider, &turn_messages, &extraction_cfg)
971 .await
972 {
973 Ok(e) => e,
974 Err(e) => {
975 tracing::warn!(error = %e, "trajectory extraction failed");
976 return;
977 }
978 };
979
980 let last_id = store
981 .trajectory_last_extracted_message_id(conversation_id)
982 .await
983 .unwrap_or(0);
984
985 let mut max_id = last_id;
986 for entry in &entries {
987 if entry.confidence < min_confidence {
988 continue;
989 }
990 let tools_json = serde_json::to_string(&entry.tools_used)
991 .unwrap_or_else(|_| "[]".to_string());
992 match store
993 .insert_trajectory_entry(zeph_memory::NewTrajectoryEntry {
994 conversation_id: Some(conversation_id),
995 turn_index: 0,
996 kind: &entry.kind,
997 intent: &entry.intent,
998 outcome: &entry.outcome,
999 tools_used: &tools_json,
1000 confidence: entry.confidence,
1001 })
1002 .await
1003 {
1004 Ok(id) => {
1005 if id > max_id {
1006 max_id = id;
1007 }
1008 }
1009 Err(e) => tracing::warn!(error = %e, "failed to insert trajectory entry"),
1010 }
1011 }
1012
1013 if max_id > last_id {
1014 let _ = store
1015 .set_trajectory_last_extracted_message_id(conversation_id, max_id)
1016 .await;
1017 }
1018
1019 tracing::debug!(
1020 count = entries.len(),
1021 conversation_id,
1022 "trajectory extraction complete"
1023 );
1024 },
1025 );
1026 }
1027
1028 async fn rpe_should_skip(&mut self, content: &str) -> bool {
1033 let Some(ref rpe_mutex) = self.memory_state.extraction.rpe_router else {
1034 return false;
1035 };
1036 let Some(memory) = &self.memory_state.persistence.memory else {
1037 return false;
1038 };
1039 let candidates = zeph_memory::extract_candidate_entities(content);
1040 let provider = memory.provider();
1041 let Ok(Ok(emb_vec)) =
1042 tokio::time::timeout(std::time::Duration::from_secs(5), provider.embed(content)).await
1043 else {
1044 return false; };
1046 if let Ok(mut router) = rpe_mutex.lock() {
1047 let signal = router.compute(&emb_vec, &candidates);
1048 router.push_embedding(emb_vec);
1049 router.push_entities(&candidates);
1050 !signal.should_extract
1051 } else {
1052 tracing::warn!("rpe_router mutex poisoned; falling through to extract");
1053 false
1054 }
1055 }
1056}
1057
1058#[cfg(test)]
1059mod tests {
1060 use super::super::agent_tests::{
1061 MetricsSnapshot, MockChannel, MockToolExecutor, create_test_registry, mock_provider,
1062 };
1063 use super::*;
1064 use zeph_llm::any::AnyProvider;
1065 use zeph_memory::semantic::SemanticMemory;
1066
1067 async fn test_memory(provider: &AnyProvider) -> SemanticMemory {
1068 SemanticMemory::new(
1069 ":memory:",
1070 "http://127.0.0.1:1",
1071 provider.clone(),
1072 "test-model",
1073 )
1074 .await
1075 .unwrap()
1076 }
1077
1078 #[tokio::test]
1079 async fn load_history_without_memory_returns_ok() {
1080 let provider = mock_provider(vec![]);
1081 let channel = MockChannel::new(vec![]);
1082 let registry = create_test_registry();
1083 let executor = MockToolExecutor::no_tools();
1084 let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
1085
1086 let result = agent.load_history().await;
1087 assert!(result.is_ok());
1088 assert_eq!(agent.msg.messages.len(), 1); }
1091
1092 #[tokio::test]
1093 async fn load_history_with_messages_injects_into_agent() {
1094 let provider = mock_provider(vec![]);
1095 let channel = MockChannel::new(vec![]);
1096 let registry = create_test_registry();
1097 let executor = MockToolExecutor::no_tools();
1098
1099 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1100 let cid = memory.sqlite().create_conversation().await.unwrap();
1101
1102 memory
1103 .sqlite()
1104 .save_message(cid, "user", "hello from history")
1105 .await
1106 .unwrap();
1107 memory
1108 .sqlite()
1109 .save_message(cid, "assistant", "hi back")
1110 .await
1111 .unwrap();
1112
1113 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
1114 std::sync::Arc::new(memory),
1115 cid,
1116 50,
1117 5,
1118 100,
1119 );
1120
1121 let messages_before = agent.msg.messages.len();
1122 agent.load_history().await.unwrap();
1123 assert_eq!(agent.msg.messages.len(), messages_before + 2);
1125 }
1126
1127 #[tokio::test]
1128 async fn load_history_skips_empty_messages() {
1129 let provider = mock_provider(vec![]);
1130 let channel = MockChannel::new(vec![]);
1131 let registry = create_test_registry();
1132 let executor = MockToolExecutor::no_tools();
1133
1134 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1135 let cid = memory.sqlite().create_conversation().await.unwrap();
1136
1137 memory
1139 .sqlite()
1140 .save_message(cid, "user", " ")
1141 .await
1142 .unwrap();
1143 memory
1144 .sqlite()
1145 .save_message(cid, "user", "real message")
1146 .await
1147 .unwrap();
1148
1149 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
1150 std::sync::Arc::new(memory),
1151 cid,
1152 50,
1153 5,
1154 100,
1155 );
1156
1157 let messages_before = agent.msg.messages.len();
1158 agent.load_history().await.unwrap();
1159 assert_eq!(agent.msg.messages.len(), messages_before + 1);
1161 }
1162
1163 #[tokio::test]
1164 async fn load_history_with_empty_store_returns_ok() {
1165 let provider = mock_provider(vec![]);
1166 let channel = MockChannel::new(vec![]);
1167 let registry = create_test_registry();
1168 let executor = MockToolExecutor::no_tools();
1169
1170 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1171 let cid = memory.sqlite().create_conversation().await.unwrap();
1172
1173 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
1174 std::sync::Arc::new(memory),
1175 cid,
1176 50,
1177 5,
1178 100,
1179 );
1180
1181 let messages_before = agent.msg.messages.len();
1182 agent.load_history().await.unwrap();
1183 assert_eq!(agent.msg.messages.len(), messages_before);
1185 }
1186
1187 #[tokio::test]
1188 async fn load_history_increments_session_count_for_existing_messages() {
1189 let provider = mock_provider(vec![]);
1190 let channel = MockChannel::new(vec![]);
1191 let registry = create_test_registry();
1192 let executor = MockToolExecutor::no_tools();
1193
1194 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1195 let cid = memory.sqlite().create_conversation().await.unwrap();
1196
1197 let id1 = memory
1199 .sqlite()
1200 .save_message(cid, "user", "hello")
1201 .await
1202 .unwrap();
1203 let id2 = memory
1204 .sqlite()
1205 .save_message(cid, "assistant", "hi")
1206 .await
1207 .unwrap();
1208
1209 let memory_arc = std::sync::Arc::new(memory);
1210 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
1211 memory_arc.clone(),
1212 cid,
1213 50,
1214 5,
1215 100,
1216 );
1217
1218 agent.load_history().await.unwrap();
1219
1220 let counts: Vec<i64> = zeph_db::query_scalar(
1222 "SELECT session_count FROM messages WHERE id IN (?, ?) ORDER BY id",
1223 )
1224 .bind(id1)
1225 .bind(id2)
1226 .fetch_all(memory_arc.sqlite().pool())
1227 .await
1228 .unwrap();
1229 assert_eq!(
1230 counts,
1231 vec![1, 1],
1232 "session_count must be 1 after first restore"
1233 );
1234 }
1235
1236 #[tokio::test]
1237 async fn load_history_does_not_increment_session_count_for_new_conversation() {
1238 let provider = mock_provider(vec![]);
1239 let channel = MockChannel::new(vec![]);
1240 let registry = create_test_registry();
1241 let executor = MockToolExecutor::no_tools();
1242
1243 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1244 let cid = memory.sqlite().create_conversation().await.unwrap();
1245
1246 let memory_arc = std::sync::Arc::new(memory);
1248 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
1249 memory_arc.clone(),
1250 cid,
1251 50,
1252 5,
1253 100,
1254 );
1255
1256 agent.load_history().await.unwrap();
1257
1258 let counts: Vec<i64> =
1260 zeph_db::query_scalar("SELECT session_count FROM messages WHERE conversation_id = ?")
1261 .bind(cid)
1262 .fetch_all(memory_arc.sqlite().pool())
1263 .await
1264 .unwrap();
1265 assert!(counts.is_empty(), "new conversation must have no messages");
1266 }
1267
1268 #[tokio::test]
1269 async fn persist_message_without_memory_silently_returns() {
1270 let provider = mock_provider(vec![]);
1272 let channel = MockChannel::new(vec![]);
1273 let registry = create_test_registry();
1274 let executor = MockToolExecutor::no_tools();
1275 let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
1276
1277 agent.persist_message(Role::User, "hello", &[], false).await;
1279 }
1280
1281 #[tokio::test]
1282 async fn persist_message_assistant_autosave_false_uses_save_only() {
1283 let provider = mock_provider(vec![]);
1284 let channel = MockChannel::new(vec![]);
1285 let registry = create_test_registry();
1286 let executor = MockToolExecutor::no_tools();
1287
1288 let (tx, rx) = tokio::sync::watch::channel(MetricsSnapshot::default());
1289 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1290 let cid = memory.sqlite().create_conversation().await.unwrap();
1291
1292 let mut agent = Agent::new(provider, channel, registry, None, 5, executor)
1293 .with_metrics(tx)
1294 .with_memory(std::sync::Arc::new(memory), cid, 50, 5, 100);
1295 agent.memory_state.persistence.autosave_assistant = false;
1296 agent.memory_state.persistence.autosave_min_length = 20;
1297
1298 agent
1299 .persist_message(Role::Assistant, "short assistant reply", &[], false)
1300 .await;
1301
1302 let history = agent
1303 .memory_state
1304 .persistence
1305 .memory
1306 .as_ref()
1307 .unwrap()
1308 .sqlite()
1309 .load_history(cid, 50)
1310 .await
1311 .unwrap();
1312 assert_eq!(history.len(), 1, "message must be saved");
1313 assert_eq!(history[0].content, "short assistant reply");
1314 assert_eq!(rx.borrow().embeddings_generated, 0);
1316 }
1317
1318 #[tokio::test]
1319 async fn persist_message_assistant_below_min_length_uses_save_only() {
1320 let provider = mock_provider(vec![]);
1321 let channel = MockChannel::new(vec![]);
1322 let registry = create_test_registry();
1323 let executor = MockToolExecutor::no_tools();
1324
1325 let (tx, rx) = tokio::sync::watch::channel(MetricsSnapshot::default());
1326 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1327 let cid = memory.sqlite().create_conversation().await.unwrap();
1328
1329 let mut agent = Agent::new(provider, channel, registry, None, 5, executor)
1331 .with_metrics(tx)
1332 .with_memory(std::sync::Arc::new(memory), cid, 50, 5, 100);
1333 agent.memory_state.persistence.autosave_assistant = true;
1334 agent.memory_state.persistence.autosave_min_length = 1000;
1335
1336 agent
1337 .persist_message(Role::Assistant, "too short", &[], false)
1338 .await;
1339
1340 let history = agent
1341 .memory_state
1342 .persistence
1343 .memory
1344 .as_ref()
1345 .unwrap()
1346 .sqlite()
1347 .load_history(cid, 50)
1348 .await
1349 .unwrap();
1350 assert_eq!(history.len(), 1, "message must be saved");
1351 assert_eq!(history[0].content, "too short");
1352 assert_eq!(rx.borrow().embeddings_generated, 0);
1353 }
1354
1355 #[tokio::test]
1356 async fn persist_message_assistant_at_min_length_boundary_uses_embed() {
1357 let provider = mock_provider(vec![]);
1359 let channel = MockChannel::new(vec![]);
1360 let registry = create_test_registry();
1361 let executor = MockToolExecutor::no_tools();
1362
1363 let (tx, rx) = tokio::sync::watch::channel(MetricsSnapshot::default());
1364 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1365 let cid = memory.sqlite().create_conversation().await.unwrap();
1366
1367 let min_length = 10usize;
1368 let mut agent = Agent::new(provider, channel, registry, None, 5, executor)
1369 .with_metrics(tx)
1370 .with_memory(std::sync::Arc::new(memory), cid, 50, 5, 100);
1371 agent.memory_state.persistence.autosave_assistant = true;
1372 agent.memory_state.persistence.autosave_min_length = min_length;
1373
1374 let content_at_boundary = "A".repeat(min_length);
1376 assert_eq!(content_at_boundary.len(), min_length);
1377 agent
1378 .persist_message(Role::Assistant, &content_at_boundary, &[], false)
1379 .await;
1380
1381 assert_eq!(rx.borrow().sqlite_message_count, 1);
1383 }
1384
1385 #[tokio::test]
1386 async fn persist_message_assistant_one_below_min_length_uses_save_only() {
1387 let provider = mock_provider(vec![]);
1389 let channel = MockChannel::new(vec![]);
1390 let registry = create_test_registry();
1391 let executor = MockToolExecutor::no_tools();
1392
1393 let (tx, rx) = tokio::sync::watch::channel(MetricsSnapshot::default());
1394 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1395 let cid = memory.sqlite().create_conversation().await.unwrap();
1396
1397 let min_length = 10usize;
1398 let mut agent = Agent::new(provider, channel, registry, None, 5, executor)
1399 .with_metrics(tx)
1400 .with_memory(std::sync::Arc::new(memory), cid, 50, 5, 100);
1401 agent.memory_state.persistence.autosave_assistant = true;
1402 agent.memory_state.persistence.autosave_min_length = min_length;
1403
1404 let content_below_boundary = "A".repeat(min_length - 1);
1406 assert_eq!(content_below_boundary.len(), min_length - 1);
1407 agent
1408 .persist_message(Role::Assistant, &content_below_boundary, &[], false)
1409 .await;
1410
1411 let history = agent
1412 .memory_state
1413 .persistence
1414 .memory
1415 .as_ref()
1416 .unwrap()
1417 .sqlite()
1418 .load_history(cid, 50)
1419 .await
1420 .unwrap();
1421 assert_eq!(history.len(), 1, "message must still be saved");
1422 assert_eq!(rx.borrow().embeddings_generated, 0);
1424 }
1425
1426 #[tokio::test]
1427 async fn persist_message_increments_unsummarized_count() {
1428 let provider = mock_provider(vec![]);
1429 let channel = MockChannel::new(vec![]);
1430 let registry = create_test_registry();
1431 let executor = MockToolExecutor::no_tools();
1432
1433 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1434 let cid = memory.sqlite().create_conversation().await.unwrap();
1435
1436 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
1438 std::sync::Arc::new(memory),
1439 cid,
1440 50,
1441 5,
1442 100,
1443 );
1444
1445 assert_eq!(agent.memory_state.persistence.unsummarized_count, 0);
1446
1447 agent.persist_message(Role::User, "first", &[], false).await;
1448 assert_eq!(agent.memory_state.persistence.unsummarized_count, 1);
1449
1450 agent
1451 .persist_message(Role::User, "second", &[], false)
1452 .await;
1453 assert_eq!(agent.memory_state.persistence.unsummarized_count, 2);
1454 }
1455
1456 #[tokio::test]
1457 async fn check_summarization_resets_counter_on_success() {
1458 let provider = mock_provider(vec![]);
1459 let channel = MockChannel::new(vec![]);
1460 let registry = create_test_registry();
1461 let executor = MockToolExecutor::no_tools();
1462
1463 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1464 let cid = memory.sqlite().create_conversation().await.unwrap();
1465
1466 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
1468 std::sync::Arc::new(memory),
1469 cid,
1470 50,
1471 5,
1472 1,
1473 );
1474
1475 agent.persist_message(Role::User, "msg1", &[], false).await;
1476 agent.persist_message(Role::User, "msg2", &[], false).await;
1477
1478 assert!(agent.memory_state.persistence.unsummarized_count <= 2);
1483 }
1484
1485 #[tokio::test]
1486 async fn unsummarized_count_not_incremented_without_memory() {
1487 let provider = mock_provider(vec![]);
1488 let channel = MockChannel::new(vec![]);
1489 let registry = create_test_registry();
1490 let executor = MockToolExecutor::no_tools();
1491 let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
1492
1493 agent.persist_message(Role::User, "hello", &[], false).await;
1494 assert_eq!(agent.memory_state.persistence.unsummarized_count, 0);
1496 }
1497
1498 mod graph_extraction_guards {
1500 use super::*;
1501 use crate::config::GraphConfig;
1502 use zeph_llm::provider::MessageMetadata;
1503 use zeph_memory::graph::GraphStore;
1504
1505 fn enabled_graph_config() -> GraphConfig {
1506 GraphConfig {
1507 enabled: true,
1508 ..GraphConfig::default()
1509 }
1510 }
1511
1512 async fn agent_with_graph(
1513 provider: &AnyProvider,
1514 config: GraphConfig,
1515 ) -> Agent<MockChannel> {
1516 let memory =
1517 test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1518 let cid = memory.sqlite().create_conversation().await.unwrap();
1519 Agent::new(
1520 provider.clone(),
1521 MockChannel::new(vec![]),
1522 create_test_registry(),
1523 None,
1524 5,
1525 MockToolExecutor::no_tools(),
1526 )
1527 .with_memory(std::sync::Arc::new(memory), cid, 50, 5, 100)
1528 .with_graph_config(config)
1529 }
1530
1531 #[tokio::test]
1532 async fn injection_flag_guard_skips_extraction() {
1533 let provider = mock_provider(vec![]);
1535 let mut agent = agent_with_graph(&provider, enabled_graph_config()).await;
1536 let pool = agent
1537 .memory_state
1538 .persistence
1539 .memory
1540 .as_ref()
1541 .unwrap()
1542 .sqlite()
1543 .pool()
1544 .clone();
1545
1546 agent
1547 .enqueue_graph_extraction_task("I use Rust", true, false)
1548 .await;
1549
1550 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1552
1553 let store = GraphStore::new(pool);
1554 let count = store.get_metadata("extraction_count").await.unwrap();
1555 assert!(
1556 count.is_none(),
1557 "injection flag must prevent extraction_count from being written"
1558 );
1559 }
1560
1561 #[tokio::test]
1562 async fn disabled_config_guard_skips_extraction() {
1563 let provider = mock_provider(vec![]);
1565 let disabled_cfg = GraphConfig {
1566 enabled: false,
1567 ..GraphConfig::default()
1568 };
1569 let mut agent = agent_with_graph(&provider, disabled_cfg).await;
1570 let pool = agent
1571 .memory_state
1572 .persistence
1573 .memory
1574 .as_ref()
1575 .unwrap()
1576 .sqlite()
1577 .pool()
1578 .clone();
1579
1580 agent
1581 .enqueue_graph_extraction_task("I use Rust", false, false)
1582 .await;
1583
1584 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1585
1586 let store = GraphStore::new(pool);
1587 let count = store.get_metadata("extraction_count").await.unwrap();
1588 assert!(
1589 count.is_none(),
1590 "disabled graph config must prevent extraction"
1591 );
1592 }
1593
1594 #[tokio::test]
1595 async fn happy_path_fires_extraction() {
1596 let provider = mock_provider(vec![]);
1599 let mut agent = agent_with_graph(&provider, enabled_graph_config()).await;
1600 let pool = agent
1601 .memory_state
1602 .persistence
1603 .memory
1604 .as_ref()
1605 .unwrap()
1606 .sqlite()
1607 .pool()
1608 .clone();
1609
1610 agent
1611 .enqueue_graph_extraction_task("I use Rust for systems programming", false, false)
1612 .await;
1613
1614 tokio::time::sleep(std::time::Duration::from_millis(200)).await;
1616
1617 let store = GraphStore::new(pool);
1618 let count = store.get_metadata("extraction_count").await.unwrap();
1619 assert!(
1620 count.is_some(),
1621 "happy-path extraction must increment extraction_count"
1622 );
1623 }
1624
1625 #[tokio::test]
1626 async fn tool_result_parts_guard_skips_extraction() {
1627 let provider = mock_provider(vec![]);
1631 let mut agent = agent_with_graph(&provider, enabled_graph_config()).await;
1632 let pool = agent
1633 .memory_state
1634 .persistence
1635 .memory
1636 .as_ref()
1637 .unwrap()
1638 .sqlite()
1639 .pool()
1640 .clone();
1641
1642 agent
1643 .enqueue_graph_extraction_task(
1644 "[tool_result: abc123]\nprovider_type = \"claude\"\nallowed_commands = []",
1645 false,
1646 true, )
1648 .await;
1649
1650 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1651
1652 let store = GraphStore::new(pool);
1653 let count = store.get_metadata("extraction_count").await.unwrap();
1654 assert!(
1655 count.is_none(),
1656 "tool result message must not trigger graph extraction"
1657 );
1658 }
1659
1660 #[tokio::test]
1661 async fn context_filter_excludes_tool_result_messages() {
1662 let provider = mock_provider(vec![]);
1673 let mut agent = agent_with_graph(&provider, enabled_graph_config()).await;
1674
1675 agent.msg.messages.push(Message {
1678 role: Role::User,
1679 content: "[tool_result: abc]\nprovider_type = \"openai\"".to_owned(),
1680 parts: vec![MessagePart::ToolResult {
1681 tool_use_id: "abc".to_owned(),
1682 content: "provider_type = \"openai\"".to_owned(),
1683 is_error: false,
1684 }],
1685 metadata: MessageMetadata::default(),
1686 });
1687
1688 let pool = agent
1689 .memory_state
1690 .persistence
1691 .memory
1692 .as_ref()
1693 .unwrap()
1694 .sqlite()
1695 .pool()
1696 .clone();
1697
1698 agent
1700 .enqueue_graph_extraction_task(
1701 "I prefer Rust for systems programming",
1702 false,
1703 false,
1704 )
1705 .await;
1706
1707 tokio::time::sleep(std::time::Duration::from_millis(200)).await;
1708
1709 let store = GraphStore::new(pool);
1711 let count = store.get_metadata("extraction_count").await.unwrap();
1712 assert!(
1713 count.is_some(),
1714 "conversational message must trigger extraction even with prior tool result in history"
1715 );
1716 }
1717 }
1718
1719 mod persona_extraction_guards {
1721 use super::*;
1722 use zeph_config::PersonaConfig;
1723 use zeph_llm::provider::MessageMetadata;
1724
1725 fn enabled_persona_config() -> PersonaConfig {
1726 PersonaConfig {
1727 enabled: true,
1728 min_messages: 1,
1729 ..PersonaConfig::default()
1730 }
1731 }
1732
1733 async fn agent_with_persona(
1734 provider: &AnyProvider,
1735 config: PersonaConfig,
1736 ) -> Agent<MockChannel> {
1737 let memory =
1738 test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1739 let cid = memory.sqlite().create_conversation().await.unwrap();
1740 let mut agent = Agent::new(
1741 provider.clone(),
1742 MockChannel::new(vec![]),
1743 create_test_registry(),
1744 None,
1745 5,
1746 MockToolExecutor::no_tools(),
1747 )
1748 .with_memory(std::sync::Arc::new(memory), cid, 50, 5, 100);
1749 agent.memory_state.extraction.persona_config = config;
1750 agent
1751 }
1752
1753 #[tokio::test]
1754 async fn disabled_config_skips_spawn() {
1755 let provider = mock_provider(vec![]);
1757 let mut agent = agent_with_persona(
1758 &provider,
1759 PersonaConfig {
1760 enabled: false,
1761 ..PersonaConfig::default()
1762 },
1763 )
1764 .await;
1765
1766 agent.msg.messages.push(zeph_llm::provider::Message {
1768 role: Role::User,
1769 content: "I prefer Rust for systems programming".to_owned(),
1770 parts: vec![],
1771 metadata: MessageMetadata::default(),
1772 });
1773
1774 agent.enqueue_persona_extraction_task();
1775
1776 let store = agent
1777 .memory_state
1778 .persistence
1779 .memory
1780 .as_ref()
1781 .unwrap()
1782 .sqlite()
1783 .clone();
1784 let count = store.count_persona_facts().await.unwrap();
1785 assert_eq!(count, 0, "disabled persona config must not write any facts");
1786 }
1787
1788 #[tokio::test]
1789 async fn below_min_messages_skips_spawn() {
1790 let provider = mock_provider(vec![]);
1792 let mut agent = agent_with_persona(
1793 &provider,
1794 PersonaConfig {
1795 enabled: true,
1796 min_messages: 3,
1797 ..PersonaConfig::default()
1798 },
1799 )
1800 .await;
1801
1802 for text in ["I use Rust", "I prefer async code"] {
1803 agent.msg.messages.push(zeph_llm::provider::Message {
1804 role: Role::User,
1805 content: text.to_owned(),
1806 parts: vec![],
1807 metadata: MessageMetadata::default(),
1808 });
1809 }
1810
1811 agent.enqueue_persona_extraction_task();
1812
1813 let store = agent
1814 .memory_state
1815 .persistence
1816 .memory
1817 .as_ref()
1818 .unwrap()
1819 .sqlite()
1820 .clone();
1821 let count = store.count_persona_facts().await.unwrap();
1822 assert_eq!(
1823 count, 0,
1824 "below min_messages threshold must not trigger extraction"
1825 );
1826 }
1827
1828 #[tokio::test]
1829 async fn no_memory_skips_spawn() {
1830 let provider = mock_provider(vec![]);
1832 let channel = MockChannel::new(vec![]);
1833 let registry = create_test_registry();
1834 let executor = MockToolExecutor::no_tools();
1835 let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
1836 agent.memory_state.extraction.persona_config = enabled_persona_config();
1837 agent.msg.messages.push(zeph_llm::provider::Message {
1838 role: Role::User,
1839 content: "I like Rust".to_owned(),
1840 parts: vec![],
1841 metadata: MessageMetadata::default(),
1842 });
1843
1844 agent.enqueue_persona_extraction_task();
1846 }
1847
1848 #[tokio::test]
1849 async fn enabled_enough_messages_spawns_extraction() {
1850 use zeph_llm::mock::MockProvider;
1853 let (mock, recorded) = MockProvider::default().with_recording();
1854 let provider = AnyProvider::Mock(mock);
1855 let mut agent = agent_with_persona(&provider, enabled_persona_config()).await;
1856
1857 agent.msg.messages.push(zeph_llm::provider::Message {
1858 role: Role::User,
1859 content: "I prefer Rust for systems programming".to_owned(),
1860 parts: vec![],
1861 metadata: MessageMetadata::default(),
1862 });
1863
1864 agent.enqueue_persona_extraction_task();
1865
1866 agent.lifecycle.supervisor.join_all_for_test().await;
1868
1869 let calls = recorded.lock().unwrap();
1870 assert!(
1871 !calls.is_empty(),
1872 "happy-path: provider.chat() must be called when extraction completes"
1873 );
1874 }
1875
1876 #[tokio::test]
1877 async fn messages_capped_at_eight() {
1878 use zeph_llm::mock::MockProvider;
1881 let (mock, recorded) = MockProvider::default().with_recording();
1882 let provider = AnyProvider::Mock(mock);
1883 let mut agent = agent_with_persona(&provider, enabled_persona_config()).await;
1884
1885 for i in 0..12u32 {
1886 agent.msg.messages.push(zeph_llm::provider::Message {
1887 role: Role::User,
1888 content: format!("I like message {i}"),
1889 parts: vec![],
1890 metadata: MessageMetadata::default(),
1891 });
1892 }
1893
1894 agent.enqueue_persona_extraction_task();
1895
1896 agent.lifecycle.supervisor.join_all_for_test().await;
1898
1899 let calls = recorded.lock().unwrap();
1901 assert!(
1902 !calls.is_empty(),
1903 "extraction must run when enough messages present"
1904 );
1905 let prompt = &calls[0];
1907 let user_text = prompt
1908 .iter()
1909 .filter(|m| m.role == Role::User)
1910 .map(|m| m.content.as_str())
1911 .collect::<Vec<_>>()
1912 .join(" ");
1913 assert!(
1915 !user_text.contains("I like message 8"),
1916 "message index 8 must be excluded from extraction input"
1917 );
1918 }
1919
1920 #[test]
1921 fn long_message_truncated_at_char_boundary() {
1922 let long_content = "x".repeat(3000);
1926 let truncated = if long_content.len() > 2048 {
1927 long_content[..long_content.floor_char_boundary(2048)].to_owned()
1928 } else {
1929 long_content.clone()
1930 };
1931 assert_eq!(
1932 truncated.len(),
1933 2048,
1934 "ASCII content must be truncated to exactly 2048 bytes"
1935 );
1936
1937 let multi = "é".repeat(1500); let truncated_multi = if multi.len() > 2048 {
1940 multi[..multi.floor_char_boundary(2048)].to_owned()
1941 } else {
1942 multi.clone()
1943 };
1944 assert!(
1945 truncated_multi.len() <= 2048,
1946 "multi-byte content must not exceed 2048 bytes"
1947 );
1948 assert!(truncated_multi.is_char_boundary(truncated_multi.len()));
1949 }
1950 }
1951
1952 #[tokio::test]
1953 async fn persist_message_user_always_embeds_regardless_of_autosave_flag() {
1954 let provider = mock_provider(vec![]);
1955 let channel = MockChannel::new(vec![]);
1956 let registry = create_test_registry();
1957 let executor = MockToolExecutor::no_tools();
1958
1959 let (tx, rx) = tokio::sync::watch::channel(MetricsSnapshot::default());
1960 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1961 let cid = memory.sqlite().create_conversation().await.unwrap();
1962
1963 let mut agent = Agent::new(provider, channel, registry, None, 5, executor)
1965 .with_metrics(tx)
1966 .with_memory(std::sync::Arc::new(memory), cid, 50, 5, 100);
1967 agent.memory_state.persistence.autosave_assistant = false;
1968 agent.memory_state.persistence.autosave_min_length = 20;
1969
1970 let long_user_msg = "A".repeat(100);
1971 agent
1972 .persist_message(Role::User, &long_user_msg, &[], false)
1973 .await;
1974
1975 let history = agent
1976 .memory_state
1977 .persistence
1978 .memory
1979 .as_ref()
1980 .unwrap()
1981 .sqlite()
1982 .load_history(cid, 50)
1983 .await
1984 .unwrap();
1985 assert_eq!(history.len(), 1, "user message must be saved");
1986 assert_eq!(rx.borrow().sqlite_message_count, 1);
1989 }
1990
1991 #[tokio::test]
1995 async fn persist_message_saves_correct_tool_use_parts() {
1996 use zeph_llm::provider::MessagePart;
1997
1998 let provider = mock_provider(vec![]);
1999 let channel = MockChannel::new(vec![]);
2000 let registry = create_test_registry();
2001 let executor = MockToolExecutor::no_tools();
2002
2003 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2004 let cid = memory.sqlite().create_conversation().await.unwrap();
2005
2006 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2007 std::sync::Arc::new(memory),
2008 cid,
2009 50,
2010 5,
2011 100,
2012 );
2013
2014 let parts = vec![MessagePart::ToolUse {
2015 id: "call_abc123".to_string(),
2016 name: "read_file".to_string(),
2017 input: serde_json::json!({"path": "/tmp/test.txt"}),
2018 }];
2019 let content = "[tool_use: read_file(call_abc123)]";
2020
2021 agent
2022 .persist_message(Role::Assistant, content, &parts, false)
2023 .await;
2024
2025 let history = agent
2026 .memory_state
2027 .persistence
2028 .memory
2029 .as_ref()
2030 .unwrap()
2031 .sqlite()
2032 .load_history(cid, 50)
2033 .await
2034 .unwrap();
2035
2036 assert_eq!(history.len(), 1);
2037 assert_eq!(history[0].role, Role::Assistant);
2038 assert_eq!(history[0].content, content);
2039 assert_eq!(history[0].parts.len(), 1);
2040 match &history[0].parts[0] {
2041 MessagePart::ToolUse { id, name, .. } => {
2042 assert_eq!(id, "call_abc123");
2043 assert_eq!(name, "read_file");
2044 }
2045 other => panic!("expected ToolUse part, got {other:?}"),
2046 }
2047 assert!(
2049 !history[0]
2050 .parts
2051 .iter()
2052 .any(|p| matches!(p, MessagePart::ToolResult { .. })),
2053 "assistant message must not contain ToolResult parts"
2054 );
2055 }
2056
2057 #[tokio::test]
2058 async fn persist_message_saves_correct_tool_result_parts() {
2059 use zeph_llm::provider::MessagePart;
2060
2061 let provider = mock_provider(vec![]);
2062 let channel = MockChannel::new(vec![]);
2063 let registry = create_test_registry();
2064 let executor = MockToolExecutor::no_tools();
2065
2066 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2067 let cid = memory.sqlite().create_conversation().await.unwrap();
2068
2069 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2070 std::sync::Arc::new(memory),
2071 cid,
2072 50,
2073 5,
2074 100,
2075 );
2076
2077 let parts = vec![MessagePart::ToolResult {
2078 tool_use_id: "call_abc123".to_string(),
2079 content: "file contents here".to_string(),
2080 is_error: false,
2081 }];
2082 let content = "[tool_result: call_abc123]\nfile contents here";
2083
2084 agent
2085 .persist_message(Role::User, content, &parts, false)
2086 .await;
2087
2088 let history = agent
2089 .memory_state
2090 .persistence
2091 .memory
2092 .as_ref()
2093 .unwrap()
2094 .sqlite()
2095 .load_history(cid, 50)
2096 .await
2097 .unwrap();
2098
2099 assert_eq!(history.len(), 1);
2100 assert_eq!(history[0].role, Role::User);
2101 assert_eq!(history[0].content, content);
2102 assert_eq!(history[0].parts.len(), 1);
2103 match &history[0].parts[0] {
2104 MessagePart::ToolResult {
2105 tool_use_id,
2106 content: result_content,
2107 is_error,
2108 } => {
2109 assert_eq!(tool_use_id, "call_abc123");
2110 assert_eq!(result_content, "file contents here");
2111 assert!(!is_error);
2112 }
2113 other => panic!("expected ToolResult part, got {other:?}"),
2114 }
2115 assert!(
2117 !history[0]
2118 .parts
2119 .iter()
2120 .any(|p| matches!(p, MessagePart::ToolUse { .. })),
2121 "user ToolResult message must not contain ToolUse parts"
2122 );
2123 }
2124
2125 #[tokio::test]
2126 async fn persist_message_roundtrip_preserves_role_part_alignment() {
2127 use zeph_llm::provider::MessagePart;
2128
2129 let provider = mock_provider(vec![]);
2130 let channel = MockChannel::new(vec![]);
2131 let registry = create_test_registry();
2132 let executor = MockToolExecutor::no_tools();
2133
2134 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2135 let cid = memory.sqlite().create_conversation().await.unwrap();
2136
2137 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2138 std::sync::Arc::new(memory),
2139 cid,
2140 50,
2141 5,
2142 100,
2143 );
2144
2145 let assistant_parts = vec![MessagePart::ToolUse {
2147 id: "id_1".to_string(),
2148 name: "list_dir".to_string(),
2149 input: serde_json::json!({"path": "/tmp"}),
2150 }];
2151 agent
2152 .persist_message(
2153 Role::Assistant,
2154 "[tool_use: list_dir(id_1)]",
2155 &assistant_parts,
2156 false,
2157 )
2158 .await;
2159
2160 let user_parts = vec![MessagePart::ToolResult {
2162 tool_use_id: "id_1".to_string(),
2163 content: "file1.txt\nfile2.txt".to_string(),
2164 is_error: false,
2165 }];
2166 agent
2167 .persist_message(
2168 Role::User,
2169 "[tool_result: id_1]\nfile1.txt\nfile2.txt",
2170 &user_parts,
2171 false,
2172 )
2173 .await;
2174
2175 let history = agent
2176 .memory_state
2177 .persistence
2178 .memory
2179 .as_ref()
2180 .unwrap()
2181 .sqlite()
2182 .load_history(cid, 50)
2183 .await
2184 .unwrap();
2185
2186 assert_eq!(history.len(), 2);
2187
2188 assert_eq!(history[0].role, Role::Assistant);
2190 assert_eq!(history[0].content, "[tool_use: list_dir(id_1)]");
2191 assert!(
2192 matches!(&history[0].parts[0], MessagePart::ToolUse { id, .. } if id == "id_1"),
2193 "first message must be assistant ToolUse"
2194 );
2195
2196 assert_eq!(history[1].role, Role::User);
2198 assert_eq!(
2199 history[1].content,
2200 "[tool_result: id_1]\nfile1.txt\nfile2.txt"
2201 );
2202 assert!(
2203 matches!(&history[1].parts[0], MessagePart::ToolResult { tool_use_id, .. } if tool_use_id == "id_1"),
2204 "second message must be user ToolResult"
2205 );
2206
2207 assert!(
2209 !history[0]
2210 .parts
2211 .iter()
2212 .any(|p| matches!(p, MessagePart::ToolResult { .. })),
2213 "assistant message must not have ToolResult parts"
2214 );
2215 assert!(
2216 !history[1]
2217 .parts
2218 .iter()
2219 .any(|p| matches!(p, MessagePart::ToolUse { .. })),
2220 "user message must not have ToolUse parts"
2221 );
2222 }
2223
2224 #[tokio::test]
2225 async fn persist_message_saves_correct_tool_output_parts() {
2226 use zeph_llm::provider::MessagePart;
2227
2228 let provider = mock_provider(vec![]);
2229 let channel = MockChannel::new(vec![]);
2230 let registry = create_test_registry();
2231 let executor = MockToolExecutor::no_tools();
2232
2233 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2234 let cid = memory.sqlite().create_conversation().await.unwrap();
2235
2236 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2237 std::sync::Arc::new(memory),
2238 cid,
2239 50,
2240 5,
2241 100,
2242 );
2243
2244 let parts = vec![MessagePart::ToolOutput {
2245 tool_name: "shell".into(),
2246 body: "hello from shell".to_string(),
2247 compacted_at: None,
2248 }];
2249 let content = "[tool: shell]\nhello from shell";
2250
2251 agent
2252 .persist_message(Role::User, content, &parts, false)
2253 .await;
2254
2255 let history = agent
2256 .memory_state
2257 .persistence
2258 .memory
2259 .as_ref()
2260 .unwrap()
2261 .sqlite()
2262 .load_history(cid, 50)
2263 .await
2264 .unwrap();
2265
2266 assert_eq!(history.len(), 1);
2267 assert_eq!(history[0].role, Role::User);
2268 assert_eq!(history[0].content, content);
2269 assert_eq!(history[0].parts.len(), 1);
2270 match &history[0].parts[0] {
2271 MessagePart::ToolOutput {
2272 tool_name,
2273 body,
2274 compacted_at,
2275 } => {
2276 assert_eq!(tool_name, "shell");
2277 assert_eq!(body, "hello from shell");
2278 assert!(compacted_at.is_none());
2279 }
2280 other => panic!("expected ToolOutput part, got {other:?}"),
2281 }
2282 }
2283
2284 #[tokio::test]
2287 async fn load_history_removes_trailing_orphan_tool_use() {
2288 use zeph_llm::provider::MessagePart;
2289
2290 let provider = mock_provider(vec![]);
2291 let channel = MockChannel::new(vec![]);
2292 let registry = create_test_registry();
2293 let executor = MockToolExecutor::no_tools();
2294
2295 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2296 let cid = memory.sqlite().create_conversation().await.unwrap();
2297 let sqlite = memory.sqlite();
2298
2299 sqlite
2301 .save_message(cid, "user", "do something with a tool")
2302 .await
2303 .unwrap();
2304
2305 let parts = serde_json::to_string(&[MessagePart::ToolUse {
2307 id: "call_orphan".to_string(),
2308 name: "shell".to_string(),
2309 input: serde_json::json!({"command": "ls"}),
2310 }])
2311 .unwrap();
2312 sqlite
2313 .save_message_with_parts(cid, "assistant", "[tool_use: shell(call_orphan)]", &parts)
2314 .await
2315 .unwrap();
2316
2317 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2318 std::sync::Arc::new(memory),
2319 cid,
2320 50,
2321 5,
2322 100,
2323 );
2324
2325 let messages_before = agent.msg.messages.len();
2326 agent.load_history().await.unwrap();
2327
2328 assert_eq!(
2330 agent.msg.messages.len(),
2331 messages_before + 1,
2332 "orphaned trailing tool_use must be removed"
2333 );
2334 assert_eq!(agent.msg.messages.last().unwrap().role, Role::User);
2335 }
2336
2337 #[tokio::test]
2338 async fn load_history_removes_leading_orphan_tool_result() {
2339 use zeph_llm::provider::MessagePart;
2340
2341 let provider = mock_provider(vec![]);
2342 let channel = MockChannel::new(vec![]);
2343 let registry = create_test_registry();
2344 let executor = MockToolExecutor::no_tools();
2345
2346 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2347 let cid = memory.sqlite().create_conversation().await.unwrap();
2348 let sqlite = memory.sqlite();
2349
2350 let result_parts = serde_json::to_string(&[MessagePart::ToolResult {
2352 tool_use_id: "call_missing".to_string(),
2353 content: "result data".to_string(),
2354 is_error: false,
2355 }])
2356 .unwrap();
2357 sqlite
2358 .save_message_with_parts(
2359 cid,
2360 "user",
2361 "[tool_result: call_missing]\nresult data",
2362 &result_parts,
2363 )
2364 .await
2365 .unwrap();
2366
2367 sqlite
2369 .save_message(cid, "assistant", "here is my response")
2370 .await
2371 .unwrap();
2372
2373 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2374 std::sync::Arc::new(memory),
2375 cid,
2376 50,
2377 5,
2378 100,
2379 );
2380
2381 let messages_before = agent.msg.messages.len();
2382 agent.load_history().await.unwrap();
2383
2384 assert_eq!(
2386 agent.msg.messages.len(),
2387 messages_before + 1,
2388 "orphaned leading tool_result must be removed"
2389 );
2390 assert_eq!(agent.msg.messages.last().unwrap().role, Role::Assistant);
2391 }
2392
2393 #[tokio::test]
2394 async fn load_history_preserves_complete_tool_pairs() {
2395 use zeph_llm::provider::MessagePart;
2396
2397 let provider = mock_provider(vec![]);
2398 let channel = MockChannel::new(vec![]);
2399 let registry = create_test_registry();
2400 let executor = MockToolExecutor::no_tools();
2401
2402 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2403 let cid = memory.sqlite().create_conversation().await.unwrap();
2404 let sqlite = memory.sqlite();
2405
2406 let use_parts = serde_json::to_string(&[MessagePart::ToolUse {
2408 id: "call_ok".to_string(),
2409 name: "shell".to_string(),
2410 input: serde_json::json!({"command": "pwd"}),
2411 }])
2412 .unwrap();
2413 sqlite
2414 .save_message_with_parts(cid, "assistant", "[tool_use: shell(call_ok)]", &use_parts)
2415 .await
2416 .unwrap();
2417
2418 let result_parts = serde_json::to_string(&[MessagePart::ToolResult {
2419 tool_use_id: "call_ok".to_string(),
2420 content: "/home/user".to_string(),
2421 is_error: false,
2422 }])
2423 .unwrap();
2424 sqlite
2425 .save_message_with_parts(
2426 cid,
2427 "user",
2428 "[tool_result: call_ok]\n/home/user",
2429 &result_parts,
2430 )
2431 .await
2432 .unwrap();
2433
2434 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2435 std::sync::Arc::new(memory),
2436 cid,
2437 50,
2438 5,
2439 100,
2440 );
2441
2442 let messages_before = agent.msg.messages.len();
2443 agent.load_history().await.unwrap();
2444
2445 assert_eq!(
2447 agent.msg.messages.len(),
2448 messages_before + 2,
2449 "complete tool_use/tool_result pair must be preserved"
2450 );
2451 assert_eq!(agent.msg.messages[messages_before].role, Role::Assistant);
2452 assert_eq!(agent.msg.messages[messages_before + 1].role, Role::User);
2453 }
2454
2455 #[tokio::test]
2456 async fn load_history_handles_multiple_trailing_orphans() {
2457 use zeph_llm::provider::MessagePart;
2458
2459 let provider = mock_provider(vec![]);
2460 let channel = MockChannel::new(vec![]);
2461 let registry = create_test_registry();
2462 let executor = MockToolExecutor::no_tools();
2463
2464 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2465 let cid = memory.sqlite().create_conversation().await.unwrap();
2466 let sqlite = memory.sqlite();
2467
2468 sqlite.save_message(cid, "user", "start").await.unwrap();
2470
2471 let parts1 = serde_json::to_string(&[MessagePart::ToolUse {
2473 id: "call_1".to_string(),
2474 name: "shell".to_string(),
2475 input: serde_json::json!({}),
2476 }])
2477 .unwrap();
2478 sqlite
2479 .save_message_with_parts(cid, "assistant", "[tool_use: shell(call_1)]", &parts1)
2480 .await
2481 .unwrap();
2482
2483 let parts2 = serde_json::to_string(&[MessagePart::ToolUse {
2485 id: "call_2".to_string(),
2486 name: "read_file".to_string(),
2487 input: serde_json::json!({}),
2488 }])
2489 .unwrap();
2490 sqlite
2491 .save_message_with_parts(cid, "assistant", "[tool_use: read_file(call_2)]", &parts2)
2492 .await
2493 .unwrap();
2494
2495 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2496 std::sync::Arc::new(memory),
2497 cid,
2498 50,
2499 5,
2500 100,
2501 );
2502
2503 let messages_before = agent.msg.messages.len();
2504 agent.load_history().await.unwrap();
2505
2506 assert_eq!(
2508 agent.msg.messages.len(),
2509 messages_before + 1,
2510 "all trailing orphaned tool_use messages must be removed"
2511 );
2512 assert_eq!(agent.msg.messages.last().unwrap().role, Role::User);
2513 }
2514
2515 #[tokio::test]
2516 async fn load_history_no_tool_messages_unchanged() {
2517 let provider = mock_provider(vec![]);
2518 let channel = MockChannel::new(vec![]);
2519 let registry = create_test_registry();
2520 let executor = MockToolExecutor::no_tools();
2521
2522 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2523 let cid = memory.sqlite().create_conversation().await.unwrap();
2524 let sqlite = memory.sqlite();
2525
2526 sqlite.save_message(cid, "user", "hello").await.unwrap();
2527 sqlite
2528 .save_message(cid, "assistant", "hi there")
2529 .await
2530 .unwrap();
2531 sqlite
2532 .save_message(cid, "user", "how are you?")
2533 .await
2534 .unwrap();
2535
2536 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2537 std::sync::Arc::new(memory),
2538 cid,
2539 50,
2540 5,
2541 100,
2542 );
2543
2544 let messages_before = agent.msg.messages.len();
2545 agent.load_history().await.unwrap();
2546
2547 assert_eq!(
2549 agent.msg.messages.len(),
2550 messages_before + 3,
2551 "plain messages without tool parts must pass through unchanged"
2552 );
2553 }
2554
2555 #[tokio::test]
2556 async fn load_history_removes_both_leading_and_trailing_orphans() {
2557 use zeph_llm::provider::MessagePart;
2558
2559 let provider = mock_provider(vec![]);
2560 let channel = MockChannel::new(vec![]);
2561 let registry = create_test_registry();
2562 let executor = MockToolExecutor::no_tools();
2563
2564 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2565 let cid = memory.sqlite().create_conversation().await.unwrap();
2566 let sqlite = memory.sqlite();
2567
2568 let result_parts = serde_json::to_string(&[MessagePart::ToolResult {
2570 tool_use_id: "call_leading".to_string(),
2571 content: "orphaned result".to_string(),
2572 is_error: false,
2573 }])
2574 .unwrap();
2575 sqlite
2576 .save_message_with_parts(
2577 cid,
2578 "user",
2579 "[tool_result: call_leading]\norphaned result",
2580 &result_parts,
2581 )
2582 .await
2583 .unwrap();
2584
2585 sqlite
2587 .save_message(cid, "user", "what is 2+2?")
2588 .await
2589 .unwrap();
2590 sqlite.save_message(cid, "assistant", "4").await.unwrap();
2591
2592 let use_parts = serde_json::to_string(&[MessagePart::ToolUse {
2594 id: "call_trailing".to_string(),
2595 name: "shell".to_string(),
2596 input: serde_json::json!({"command": "date"}),
2597 }])
2598 .unwrap();
2599 sqlite
2600 .save_message_with_parts(
2601 cid,
2602 "assistant",
2603 "[tool_use: shell(call_trailing)]",
2604 &use_parts,
2605 )
2606 .await
2607 .unwrap();
2608
2609 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2610 std::sync::Arc::new(memory),
2611 cid,
2612 50,
2613 5,
2614 100,
2615 );
2616
2617 let messages_before = agent.msg.messages.len();
2618 agent.load_history().await.unwrap();
2619
2620 assert_eq!(
2622 agent.msg.messages.len(),
2623 messages_before + 2,
2624 "both leading and trailing orphans must be removed"
2625 );
2626 assert_eq!(agent.msg.messages[messages_before].role, Role::User);
2627 assert_eq!(agent.msg.messages[messages_before].content, "what is 2+2?");
2628 assert_eq!(
2629 agent.msg.messages[messages_before + 1].role,
2630 Role::Assistant
2631 );
2632 assert_eq!(agent.msg.messages[messages_before + 1].content, "4");
2633 }
2634
2635 #[tokio::test]
2640 async fn sanitize_tool_pairs_strips_mid_history_orphan_tool_use() {
2641 use zeph_llm::provider::MessagePart;
2642
2643 let provider = mock_provider(vec![]);
2644 let channel = MockChannel::new(vec![]);
2645 let registry = create_test_registry();
2646 let executor = MockToolExecutor::no_tools();
2647
2648 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2649 let cid = memory.sqlite().create_conversation().await.unwrap();
2650 let sqlite = memory.sqlite();
2651
2652 sqlite
2654 .save_message(cid, "user", "first question")
2655 .await
2656 .unwrap();
2657 sqlite
2658 .save_message(cid, "assistant", "first answer")
2659 .await
2660 .unwrap();
2661
2662 let use_parts = serde_json::to_string(&[
2666 MessagePart::ToolUse {
2667 id: "call_mid_1".to_string(),
2668 name: "shell".to_string(),
2669 input: serde_json::json!({"command": "ls"}),
2670 },
2671 MessagePart::Text {
2672 text: "Let me check the files.".to_string(),
2673 },
2674 ])
2675 .unwrap();
2676 sqlite
2677 .save_message_with_parts(cid, "assistant", "Let me check the files.", &use_parts)
2678 .await
2679 .unwrap();
2680
2681 sqlite
2683 .save_message(cid, "user", "second question")
2684 .await
2685 .unwrap();
2686 sqlite
2687 .save_message(cid, "assistant", "second answer")
2688 .await
2689 .unwrap();
2690
2691 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2692 std::sync::Arc::new(memory),
2693 cid,
2694 50,
2695 5,
2696 100,
2697 );
2698
2699 let messages_before = agent.msg.messages.len();
2700 agent.load_history().await.unwrap();
2701
2702 assert_eq!(
2705 agent.msg.messages.len(),
2706 messages_before + 5,
2707 "message count must be 5 (orphan message kept — has text content)"
2708 );
2709
2710 let orphan = &agent.msg.messages[messages_before + 2];
2712 assert_eq!(orphan.role, Role::Assistant);
2713 assert!(
2714 !orphan
2715 .parts
2716 .iter()
2717 .any(|p| matches!(p, MessagePart::ToolUse { .. })),
2718 "orphaned ToolUse parts must be stripped from mid-history message"
2719 );
2720 assert!(
2722 orphan.parts.iter().any(
2723 |p| matches!(p, MessagePart::Text { text } if text == "Let me check the files.")
2724 ),
2725 "text content of orphaned assistant message must be preserved"
2726 );
2727 }
2728
2729 #[tokio::test]
2734 async fn load_history_keeps_tool_only_user_message() {
2735 use zeph_llm::provider::MessagePart;
2736
2737 let provider = mock_provider(vec![]);
2738 let channel = MockChannel::new(vec![]);
2739 let registry = create_test_registry();
2740 let executor = MockToolExecutor::no_tools();
2741
2742 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2743 let cid = memory.sqlite().create_conversation().await.unwrap();
2744 let sqlite = memory.sqlite();
2745
2746 let use_parts = serde_json::to_string(&[MessagePart::ToolUse {
2748 id: "call_rc3".to_string(),
2749 name: "memory_save".to_string(),
2750 input: serde_json::json!({"content": "something"}),
2751 }])
2752 .unwrap();
2753 sqlite
2754 .save_message_with_parts(cid, "assistant", "[tool_use: memory_save]", &use_parts)
2755 .await
2756 .unwrap();
2757
2758 let result_parts = serde_json::to_string(&[MessagePart::ToolResult {
2760 tool_use_id: "call_rc3".to_string(),
2761 content: "saved".to_string(),
2762 is_error: false,
2763 }])
2764 .unwrap();
2765 sqlite
2766 .save_message_with_parts(cid, "user", "", &result_parts)
2767 .await
2768 .unwrap();
2769
2770 sqlite.save_message(cid, "assistant", "done").await.unwrap();
2771
2772 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2773 std::sync::Arc::new(memory),
2774 cid,
2775 50,
2776 5,
2777 100,
2778 );
2779
2780 let messages_before = agent.msg.messages.len();
2781 agent.load_history().await.unwrap();
2782
2783 assert_eq!(
2786 agent.msg.messages.len(),
2787 messages_before + 3,
2788 "user message with empty content but ToolResult parts must not be dropped"
2789 );
2790
2791 let user_msg = &agent.msg.messages[messages_before + 1];
2793 assert_eq!(user_msg.role, Role::User);
2794 assert!(
2795 user_msg.parts.iter().any(
2796 |p| matches!(p, MessagePart::ToolResult { tool_use_id, .. } if tool_use_id == "call_rc3")
2797 ),
2798 "ToolResult part must be preserved on user message with empty content"
2799 );
2800 }
2801
2802 #[tokio::test]
2806 async fn strip_orphans_removes_orphaned_tool_result() {
2807 use zeph_llm::provider::MessagePart;
2808
2809 let provider = mock_provider(vec![]);
2810 let channel = MockChannel::new(vec![]);
2811 let registry = create_test_registry();
2812 let executor = MockToolExecutor::no_tools();
2813
2814 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2815 let cid = memory.sqlite().create_conversation().await.unwrap();
2816 let sqlite = memory.sqlite();
2817
2818 sqlite.save_message(cid, "user", "hello").await.unwrap();
2820 sqlite.save_message(cid, "assistant", "hi").await.unwrap();
2821
2822 sqlite
2824 .save_message(cid, "assistant", "plain answer")
2825 .await
2826 .unwrap();
2827
2828 let orphan_result_parts = serde_json::to_string(&[MessagePart::ToolResult {
2830 tool_use_id: "call_nonexistent".to_string(),
2831 content: "stale result".to_string(),
2832 is_error: false,
2833 }])
2834 .unwrap();
2835 sqlite
2836 .save_message_with_parts(
2837 cid,
2838 "user",
2839 "[tool_result: call_nonexistent]\nstale result",
2840 &orphan_result_parts,
2841 )
2842 .await
2843 .unwrap();
2844
2845 sqlite
2846 .save_message(cid, "assistant", "final")
2847 .await
2848 .unwrap();
2849
2850 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2851 std::sync::Arc::new(memory),
2852 cid,
2853 50,
2854 5,
2855 100,
2856 );
2857
2858 let messages_before = agent.msg.messages.len();
2859 agent.load_history().await.unwrap();
2860
2861 let loaded = &agent.msg.messages[messages_before..];
2865 for msg in loaded {
2866 assert!(
2867 !msg.parts.iter().any(|p| matches!(
2868 p,
2869 MessagePart::ToolResult { tool_use_id, .. } if tool_use_id == "call_nonexistent"
2870 )),
2871 "orphaned ToolResult part must be stripped from history"
2872 );
2873 }
2874 }
2875
2876 #[tokio::test]
2879 async fn strip_orphans_keeps_complete_pair() {
2880 use zeph_llm::provider::MessagePart;
2881
2882 let provider = mock_provider(vec![]);
2883 let channel = MockChannel::new(vec![]);
2884 let registry = create_test_registry();
2885 let executor = MockToolExecutor::no_tools();
2886
2887 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2888 let cid = memory.sqlite().create_conversation().await.unwrap();
2889 let sqlite = memory.sqlite();
2890
2891 let use_parts = serde_json::to_string(&[MessagePart::ToolUse {
2892 id: "call_valid".to_string(),
2893 name: "shell".to_string(),
2894 input: serde_json::json!({"command": "ls"}),
2895 }])
2896 .unwrap();
2897 sqlite
2898 .save_message_with_parts(cid, "assistant", "[tool_use: shell]", &use_parts)
2899 .await
2900 .unwrap();
2901
2902 let result_parts = serde_json::to_string(&[MessagePart::ToolResult {
2903 tool_use_id: "call_valid".to_string(),
2904 content: "file.rs".to_string(),
2905 is_error: false,
2906 }])
2907 .unwrap();
2908 sqlite
2909 .save_message_with_parts(cid, "user", "", &result_parts)
2910 .await
2911 .unwrap();
2912
2913 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2914 std::sync::Arc::new(memory),
2915 cid,
2916 50,
2917 5,
2918 100,
2919 );
2920
2921 let messages_before = agent.msg.messages.len();
2922 agent.load_history().await.unwrap();
2923
2924 assert_eq!(
2925 agent.msg.messages.len(),
2926 messages_before + 2,
2927 "complete tool_use/tool_result pair must be preserved"
2928 );
2929
2930 let user_msg = &agent.msg.messages[messages_before + 1];
2931 assert!(
2932 user_msg.parts.iter().any(|p| matches!(
2933 p,
2934 MessagePart::ToolResult { tool_use_id, .. } if tool_use_id == "call_valid"
2935 )),
2936 "ToolResult part for a matched tool_use must not be stripped"
2937 );
2938 }
2939
2940 #[tokio::test]
2943 async fn strip_orphans_mixed_history() {
2944 use zeph_llm::provider::MessagePart;
2945
2946 let provider = mock_provider(vec![]);
2947 let channel = MockChannel::new(vec![]);
2948 let registry = create_test_registry();
2949 let executor = MockToolExecutor::no_tools();
2950
2951 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2952 let cid = memory.sqlite().create_conversation().await.unwrap();
2953 let sqlite = memory.sqlite();
2954
2955 let use_parts_ok = serde_json::to_string(&[MessagePart::ToolUse {
2957 id: "call_good".to_string(),
2958 name: "shell".to_string(),
2959 input: serde_json::json!({"command": "pwd"}),
2960 }])
2961 .unwrap();
2962 sqlite
2963 .save_message_with_parts(cid, "assistant", "[tool_use: shell]", &use_parts_ok)
2964 .await
2965 .unwrap();
2966
2967 let result_parts_ok = serde_json::to_string(&[MessagePart::ToolResult {
2968 tool_use_id: "call_good".to_string(),
2969 content: "/home".to_string(),
2970 is_error: false,
2971 }])
2972 .unwrap();
2973 sqlite
2974 .save_message_with_parts(cid, "user", "", &result_parts_ok)
2975 .await
2976 .unwrap();
2977
2978 sqlite
2980 .save_message(cid, "assistant", "text only")
2981 .await
2982 .unwrap();
2983
2984 let orphan_parts = serde_json::to_string(&[MessagePart::ToolResult {
2985 tool_use_id: "call_ghost".to_string(),
2986 content: "ghost result".to_string(),
2987 is_error: false,
2988 }])
2989 .unwrap();
2990 sqlite
2991 .save_message_with_parts(
2992 cid,
2993 "user",
2994 "[tool_result: call_ghost]\nghost result",
2995 &orphan_parts,
2996 )
2997 .await
2998 .unwrap();
2999
3000 sqlite
3001 .save_message(cid, "assistant", "final reply")
3002 .await
3003 .unwrap();
3004
3005 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
3006 std::sync::Arc::new(memory),
3007 cid,
3008 50,
3009 5,
3010 100,
3011 );
3012
3013 let messages_before = agent.msg.messages.len();
3014 agent.load_history().await.unwrap();
3015
3016 let loaded = &agent.msg.messages[messages_before..];
3017
3018 for msg in loaded {
3020 assert!(
3021 !msg.parts.iter().any(|p| matches!(
3022 p,
3023 MessagePart::ToolResult { tool_use_id, .. } if tool_use_id == "call_ghost"
3024 )),
3025 "orphaned ToolResult (call_ghost) must be stripped from history"
3026 );
3027 }
3028
3029 let has_good_result = loaded.iter().any(|msg| {
3032 msg.role == Role::User
3033 && msg.parts.iter().any(|p| {
3034 matches!(
3035 p,
3036 MessagePart::ToolResult { tool_use_id, .. } if tool_use_id == "call_good"
3037 )
3038 })
3039 });
3040 assert!(
3041 has_good_result,
3042 "matched ToolResult (call_good) must be preserved in history"
3043 );
3044 }
3045
3046 #[tokio::test]
3049 async fn sanitize_tool_pairs_preserves_matched_tool_pair() {
3050 use zeph_llm::provider::MessagePart;
3051
3052 let provider = mock_provider(vec![]);
3053 let channel = MockChannel::new(vec![]);
3054 let registry = create_test_registry();
3055 let executor = MockToolExecutor::no_tools();
3056
3057 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
3058 let cid = memory.sqlite().create_conversation().await.unwrap();
3059 let sqlite = memory.sqlite();
3060
3061 sqlite
3062 .save_message(cid, "user", "run a command")
3063 .await
3064 .unwrap();
3065
3066 let use_parts = serde_json::to_string(&[MessagePart::ToolUse {
3068 id: "call_ok".to_string(),
3069 name: "shell".to_string(),
3070 input: serde_json::json!({"command": "echo hi"}),
3071 }])
3072 .unwrap();
3073 sqlite
3074 .save_message_with_parts(cid, "assistant", "[tool_use: shell]", &use_parts)
3075 .await
3076 .unwrap();
3077
3078 let result_parts = serde_json::to_string(&[MessagePart::ToolResult {
3080 tool_use_id: "call_ok".to_string(),
3081 content: "hi".to_string(),
3082 is_error: false,
3083 }])
3084 .unwrap();
3085 sqlite
3086 .save_message_with_parts(cid, "user", "[tool_result: call_ok]\nhi", &result_parts)
3087 .await
3088 .unwrap();
3089
3090 sqlite.save_message(cid, "assistant", "done").await.unwrap();
3091
3092 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
3093 std::sync::Arc::new(memory),
3094 cid,
3095 50,
3096 5,
3097 100,
3098 );
3099
3100 let messages_before = agent.msg.messages.len();
3101 agent.load_history().await.unwrap();
3102
3103 assert_eq!(
3105 agent.msg.messages.len(),
3106 messages_before + 4,
3107 "matched tool pair must not be removed"
3108 );
3109 let tool_msg = &agent.msg.messages[messages_before + 1];
3110 assert!(
3111 tool_msg
3112 .parts
3113 .iter()
3114 .any(|p| matches!(p, MessagePart::ToolUse { id, .. } if id == "call_ok")),
3115 "matched ToolUse parts must be preserved"
3116 );
3117 }
3118
3119 #[tokio::test]
3123 async fn persist_cancelled_tool_results_pairs_tool_use() {
3124 use zeph_llm::provider::MessagePart;
3125
3126 let provider = mock_provider(vec![]);
3127 let channel = MockChannel::new(vec![]);
3128 let registry = create_test_registry();
3129 let executor = MockToolExecutor::no_tools();
3130
3131 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
3132 let cid = memory.sqlite().create_conversation().await.unwrap();
3133
3134 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
3135 std::sync::Arc::new(memory),
3136 cid,
3137 50,
3138 5,
3139 100,
3140 );
3141
3142 let tool_calls = vec![
3144 zeph_llm::provider::ToolUseRequest {
3145 id: "cancel_id_1".to_string(),
3146 name: "shell".to_string().into(),
3147 input: serde_json::json!({}),
3148 },
3149 zeph_llm::provider::ToolUseRequest {
3150 id: "cancel_id_2".to_string(),
3151 name: "read_file".to_string().into(),
3152 input: serde_json::json!({}),
3153 },
3154 ];
3155
3156 agent.persist_cancelled_tool_results(&tool_calls).await;
3157
3158 let history = agent
3159 .memory_state
3160 .persistence
3161 .memory
3162 .as_ref()
3163 .unwrap()
3164 .sqlite()
3165 .load_history(cid, 50)
3166 .await
3167 .unwrap();
3168
3169 assert_eq!(history.len(), 1);
3171 assert_eq!(history[0].role, Role::User);
3172
3173 for tc in &tool_calls {
3175 assert!(
3176 history[0].parts.iter().any(|p| matches!(
3177 p,
3178 MessagePart::ToolResult { tool_use_id, is_error, .. }
3179 if tool_use_id == &tc.id && *is_error
3180 )),
3181 "tombstone ToolResult for {} must be present and is_error=true",
3182 tc.id
3183 );
3184 }
3185 }
3186
3187 #[test]
3190 fn meaningful_content_empty_string() {
3191 assert!(!has_meaningful_content(""));
3192 }
3193
3194 #[test]
3195 fn meaningful_content_whitespace_only() {
3196 assert!(!has_meaningful_content(" \n\t "));
3197 }
3198
3199 #[test]
3200 fn meaningful_content_tool_use_only() {
3201 assert!(!has_meaningful_content("[tool_use: shell(call_1)]"));
3202 }
3203
3204 #[test]
3205 fn meaningful_content_tool_use_no_parens() {
3206 assert!(!has_meaningful_content("[tool_use: memory_save]"));
3208 }
3209
3210 #[test]
3211 fn meaningful_content_tool_result_with_body() {
3212 assert!(!has_meaningful_content(
3213 "[tool_result: call_1]\nsome output here"
3214 ));
3215 }
3216
3217 #[test]
3218 fn meaningful_content_tool_result_empty_body() {
3219 assert!(!has_meaningful_content("[tool_result: call_1]\n"));
3220 }
3221
3222 #[test]
3223 fn meaningful_content_tool_output_inline() {
3224 assert!(!has_meaningful_content("[tool output: bash] some result"));
3225 }
3226
3227 #[test]
3228 fn meaningful_content_tool_output_pruned() {
3229 assert!(!has_meaningful_content("[tool output: bash] (pruned)"));
3230 }
3231
3232 #[test]
3233 fn meaningful_content_tool_output_fenced() {
3234 assert!(!has_meaningful_content(
3235 "[tool output: bash]\n```\nls output\n```"
3236 ));
3237 }
3238
3239 #[test]
3240 fn meaningful_content_multiple_tool_use_tags() {
3241 assert!(!has_meaningful_content(
3242 "[tool_use: bash(id1)][tool_use: read(id2)]"
3243 ));
3244 }
3245
3246 #[test]
3247 fn meaningful_content_multiple_tool_use_tags_space_separator() {
3248 assert!(!has_meaningful_content(
3250 "[tool_use: bash(id1)] [tool_use: read(id2)]"
3251 ));
3252 }
3253
3254 #[test]
3255 fn meaningful_content_multiple_tool_use_tags_newline_separator() {
3256 assert!(!has_meaningful_content(
3258 "[tool_use: bash(id1)]\n[tool_use: read(id2)]"
3259 ));
3260 }
3261
3262 #[test]
3263 fn meaningful_content_tool_result_followed_by_tool_use() {
3264 assert!(!has_meaningful_content(
3266 "[tool_result: call_1]\nresult\n[tool_use: bash(call_2)]"
3267 ));
3268 }
3269
3270 #[test]
3271 fn meaningful_content_real_text_only() {
3272 assert!(has_meaningful_content("Hello, how can I help you?"));
3273 }
3274
3275 #[test]
3276 fn meaningful_content_text_before_tool_tag() {
3277 assert!(has_meaningful_content("Let me check. [tool_use: bash(id)]"));
3278 }
3279
3280 #[test]
3281 fn meaningful_content_text_after_tool_use_tag() {
3282 assert!(has_meaningful_content("[tool_use: bash] I ran the command"));
3286 }
3287
3288 #[test]
3289 fn meaningful_content_text_between_tags() {
3290 assert!(has_meaningful_content(
3291 "[tool_use: bash(id1)]\nand then\n[tool_use: read(id2)]"
3292 ));
3293 }
3294
3295 #[test]
3296 fn meaningful_content_malformed_tag_no_closing_bracket() {
3297 assert!(has_meaningful_content("[tool_use: "));
3299 }
3300
3301 #[test]
3302 fn meaningful_content_tool_use_and_tool_result_only() {
3303 assert!(!has_meaningful_content(
3305 "[tool_use: memory_save(call_abc)]\n[tool_result: call_abc]\nsaved"
3306 ));
3307 }
3308
3309 #[test]
3310 fn meaningful_content_tool_result_body_with_json_array() {
3311 assert!(!has_meaningful_content(
3312 "[tool_result: id1]\n[\"array\", \"value\"]"
3313 ));
3314 }
3315
3316 #[tokio::test]
3327 async fn issue_2529_orphaned_legacy_content_pair_is_soft_deleted() {
3328 use zeph_llm::provider::MessagePart;
3329
3330 let provider = mock_provider(vec![]);
3331 let channel = MockChannel::new(vec![]);
3332 let registry = create_test_registry();
3333 let executor = MockToolExecutor::no_tools();
3334
3335 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
3336 let cid = memory.sqlite().create_conversation().await.unwrap();
3337 let sqlite = memory.sqlite();
3338
3339 sqlite
3341 .save_message(cid, "user", "save this for me")
3342 .await
3343 .unwrap();
3344
3345 let use_parts = serde_json::to_string(&[MessagePart::ToolUse {
3348 id: "call_2529".to_string(),
3349 name: "memory_save".to_string(),
3350 input: serde_json::json!({"content": "save this"}),
3351 }])
3352 .unwrap();
3353 let orphan_assistant_id = sqlite
3354 .save_message_with_parts(
3355 cid,
3356 "assistant",
3357 "[tool_use: memory_save(call_2529)]",
3358 &use_parts,
3359 )
3360 .await
3361 .unwrap();
3362
3363 sqlite
3368 .save_message(cid, "assistant", "here is a plain reply")
3369 .await
3370 .unwrap();
3371
3372 let result_parts = serde_json::to_string(&[MessagePart::ToolResult {
3373 tool_use_id: "call_2529".to_string(),
3374 content: "saved".to_string(),
3375 is_error: false,
3376 }])
3377 .unwrap();
3378 let orphan_user_id = sqlite
3379 .save_message_with_parts(
3380 cid,
3381 "user",
3382 "[tool_result: call_2529]\nsaved",
3383 &result_parts,
3384 )
3385 .await
3386 .unwrap();
3387
3388 sqlite.save_message(cid, "assistant", "done").await.unwrap();
3390
3391 let memory_arc = std::sync::Arc::new(memory);
3392 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
3393 memory_arc.clone(),
3394 cid,
3395 50,
3396 5,
3397 100,
3398 );
3399
3400 agent.load_history().await.unwrap();
3401
3402 let assistant_deleted_count: Vec<i64> = zeph_db::query_scalar(
3405 "SELECT COUNT(*) FROM messages WHERE id = ? AND deleted_at IS NOT NULL",
3406 )
3407 .bind(orphan_assistant_id)
3408 .fetch_all(memory_arc.sqlite().pool())
3409 .await
3410 .unwrap();
3411
3412 let user_deleted_count: Vec<i64> = zeph_db::query_scalar(
3413 "SELECT COUNT(*) FROM messages WHERE id = ? AND deleted_at IS NOT NULL",
3414 )
3415 .bind(orphan_user_id)
3416 .fetch_all(memory_arc.sqlite().pool())
3417 .await
3418 .unwrap();
3419
3420 assert_eq!(
3421 assistant_deleted_count.first().copied().unwrap_or(0),
3422 1,
3423 "orphaned assistant[ToolUse] with legacy-only content must be soft-deleted (deleted_at IS NOT NULL)"
3424 );
3425 assert_eq!(
3426 user_deleted_count.first().copied().unwrap_or(0),
3427 1,
3428 "orphaned user[ToolResult] with legacy-only content must be soft-deleted (deleted_at IS NOT NULL)"
3429 );
3430 }
3431
3432 #[tokio::test]
3436 async fn issue_2529_soft_delete_is_idempotent_across_sessions() {
3437 use zeph_llm::provider::MessagePart;
3438
3439 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
3440 let cid = memory.sqlite().create_conversation().await.unwrap();
3441 let sqlite = memory.sqlite();
3442
3443 sqlite
3445 .save_message(cid, "user", "do something")
3446 .await
3447 .unwrap();
3448
3449 let use_parts = serde_json::to_string(&[MessagePart::ToolUse {
3451 id: "call_idem".to_string(),
3452 name: "shell".to_string(),
3453 input: serde_json::json!({"command": "ls"}),
3454 }])
3455 .unwrap();
3456 sqlite
3457 .save_message_with_parts(cid, "assistant", "[tool_use: shell(call_idem)]", &use_parts)
3458 .await
3459 .unwrap();
3460
3461 sqlite
3463 .save_message(cid, "assistant", "continuing")
3464 .await
3465 .unwrap();
3466
3467 let result_parts = serde_json::to_string(&[MessagePart::ToolResult {
3469 tool_use_id: "call_idem".to_string(),
3470 content: "output".to_string(),
3471 is_error: false,
3472 }])
3473 .unwrap();
3474 sqlite
3475 .save_message_with_parts(
3476 cid,
3477 "user",
3478 "[tool_result: call_idem]\noutput",
3479 &result_parts,
3480 )
3481 .await
3482 .unwrap();
3483
3484 sqlite
3485 .save_message(cid, "assistant", "final")
3486 .await
3487 .unwrap();
3488
3489 let memory_arc = std::sync::Arc::new(memory);
3490
3491 let mut agent1 = Agent::new(
3493 mock_provider(vec![]),
3494 MockChannel::new(vec![]),
3495 create_test_registry(),
3496 None,
3497 5,
3498 MockToolExecutor::no_tools(),
3499 )
3500 .with_memory(memory_arc.clone(), cid, 50, 5, 100);
3501 agent1.load_history().await.unwrap();
3502 let count_after_first = agent1.msg.messages.len();
3503
3504 let mut agent2 = Agent::new(
3507 mock_provider(vec![]),
3508 MockChannel::new(vec![]),
3509 create_test_registry(),
3510 None,
3511 5,
3512 MockToolExecutor::no_tools(),
3513 )
3514 .with_memory(memory_arc.clone(), cid, 50, 5, 100);
3515 agent2.load_history().await.unwrap();
3516 let count_after_second = agent2.msg.messages.len();
3517
3518 assert_eq!(
3520 count_after_first, count_after_second,
3521 "second load_history must load the same message count as the first (soft-deleted orphans excluded)"
3522 );
3523 }
3524
3525 #[tokio::test]
3529 async fn issue_2529_message_with_text_and_tool_tag_is_kept_after_part_strip() {
3530 use zeph_llm::provider::MessagePart;
3531
3532 let provider = mock_provider(vec![]);
3533 let channel = MockChannel::new(vec![]);
3534 let registry = create_test_registry();
3535 let executor = MockToolExecutor::no_tools();
3536
3537 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
3538 let cid = memory.sqlite().create_conversation().await.unwrap();
3539 let sqlite = memory.sqlite();
3540
3541 sqlite
3543 .save_message(cid, "user", "check the files")
3544 .await
3545 .unwrap();
3546
3547 let use_parts = serde_json::to_string(&[MessagePart::ToolUse {
3550 id: "call_mixed".to_string(),
3551 name: "shell".to_string(),
3552 input: serde_json::json!({"command": "ls"}),
3553 }])
3554 .unwrap();
3555 sqlite
3556 .save_message_with_parts(
3557 cid,
3558 "assistant",
3559 "Let me list the directory. [tool_use: shell(call_mixed)]",
3560 &use_parts,
3561 )
3562 .await
3563 .unwrap();
3564
3565 sqlite.save_message(cid, "user", "thanks").await.unwrap();
3567 sqlite
3568 .save_message(cid, "assistant", "you are welcome")
3569 .await
3570 .unwrap();
3571
3572 let memory_arc = std::sync::Arc::new(memory);
3573 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
3574 memory_arc.clone(),
3575 cid,
3576 50,
3577 5,
3578 100,
3579 );
3580
3581 let messages_before = agent.msg.messages.len();
3582 agent.load_history().await.unwrap();
3583
3584 assert_eq!(
3586 agent.msg.messages.len(),
3587 messages_before + 4,
3588 "assistant message with text + tool tag must not be removed after ToolUse strip"
3589 );
3590
3591 let mixed_msg = agent
3593 .msg
3594 .messages
3595 .iter()
3596 .find(|m| m.content.contains("Let me list the directory"))
3597 .expect("mixed-content assistant message must still be in history");
3598 assert!(
3599 !mixed_msg
3600 .parts
3601 .iter()
3602 .any(|p| matches!(p, MessagePart::ToolUse { .. })),
3603 "orphaned ToolUse parts must be stripped even when message has meaningful text"
3604 );
3605 assert_eq!(
3606 mixed_msg.content, "Let me list the directory. [tool_use: shell(call_mixed)]",
3607 "content field must be unchanged — only parts are stripped"
3608 );
3609 }
3610
3611 #[tokio::test]
3614 async fn persist_message_skipped_tool_result_does_not_embed() {
3615 use zeph_llm::provider::MessagePart;
3616
3617 let provider = mock_provider(vec![]);
3618 let channel = MockChannel::new(vec![]);
3619 let registry = create_test_registry();
3620 let executor = MockToolExecutor::no_tools();
3621
3622 let (tx, rx) = tokio::sync::watch::channel(MetricsSnapshot::default());
3623 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
3624 let cid = memory.sqlite().create_conversation().await.unwrap();
3625
3626 let mut agent = Agent::new(provider, channel, registry, None, 5, executor)
3627 .with_metrics(tx)
3628 .with_memory(std::sync::Arc::new(memory), cid, 50, 5, 100);
3629 agent.memory_state.persistence.autosave_assistant = true;
3630 agent.memory_state.persistence.autosave_min_length = 0;
3631
3632 let parts = vec![MessagePart::ToolResult {
3633 tool_use_id: "tu1".into(),
3634 content: "[skipped] bash tool was blocked by utility gate".into(),
3635 is_error: false,
3636 }];
3637
3638 agent
3639 .persist_message(
3640 Role::User,
3641 "[skipped] bash tool was blocked by utility gate",
3642 &parts,
3643 false,
3644 )
3645 .await;
3646
3647 assert_eq!(
3648 rx.borrow().embeddings_generated,
3649 0,
3650 "[skipped] ToolResult must not be embedded into Qdrant"
3651 );
3652 }
3653
3654 #[tokio::test]
3655 async fn persist_message_stopped_tool_result_does_not_embed() {
3656 use zeph_llm::provider::MessagePart;
3657
3658 let provider = mock_provider(vec![]);
3659 let channel = MockChannel::new(vec![]);
3660 let registry = create_test_registry();
3661 let executor = MockToolExecutor::no_tools();
3662
3663 let (tx, rx) = tokio::sync::watch::channel(MetricsSnapshot::default());
3664 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
3665 let cid = memory.sqlite().create_conversation().await.unwrap();
3666
3667 let mut agent = Agent::new(provider, channel, registry, None, 5, executor)
3668 .with_metrics(tx)
3669 .with_memory(std::sync::Arc::new(memory), cid, 50, 5, 100);
3670 agent.memory_state.persistence.autosave_assistant = true;
3671 agent.memory_state.persistence.autosave_min_length = 0;
3672
3673 let parts = vec![MessagePart::ToolResult {
3674 tool_use_id: "tu2".into(),
3675 content: "[stopped] execution limit reached".into(),
3676 is_error: false,
3677 }];
3678
3679 agent
3680 .persist_message(
3681 Role::User,
3682 "[stopped] execution limit reached",
3683 &parts,
3684 false,
3685 )
3686 .await;
3687
3688 assert_eq!(
3689 rx.borrow().embeddings_generated,
3690 0,
3691 "[stopped] ToolResult must not be embedded into Qdrant"
3692 );
3693 }
3694
3695 #[tokio::test]
3696 async fn persist_message_normal_tool_result_is_saved_not_blocked_by_guard() {
3697 use zeph_llm::provider::MessagePart;
3700
3701 let provider = mock_provider(vec![]);
3702 let channel = MockChannel::new(vec![]);
3703 let registry = create_test_registry();
3704 let executor = MockToolExecutor::no_tools();
3705
3706 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
3707 let cid = memory.sqlite().create_conversation().await.unwrap();
3708 let memory_arc = std::sync::Arc::new(memory);
3709
3710 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
3711 memory_arc.clone(),
3712 cid,
3713 50,
3714 5,
3715 100,
3716 );
3717 agent.memory_state.persistence.autosave_assistant = true;
3718 agent.memory_state.persistence.autosave_min_length = 0;
3719
3720 let content = "total 42\ndrwxr-xr-x 5 user group";
3721 let parts = vec![MessagePart::ToolResult {
3722 tool_use_id: "tu3".into(),
3723 content: content.into(),
3724 is_error: false,
3725 }];
3726
3727 agent
3728 .persist_message(Role::User, content, &parts, false)
3729 .await;
3730
3731 let history = memory_arc.sqlite().load_history(cid, 50).await.unwrap();
3733 assert_eq!(
3734 history.len(),
3735 1,
3736 "normal ToolResult must be saved to SQLite"
3737 );
3738 assert_eq!(history[0].content, content);
3739 }
3740
3741 #[test]
3746 fn trajectory_extraction_slice_bounds_messages() {
3747 let max_messages: usize = 20;
3749 let total_messages = 100usize;
3750
3751 let tail_start = total_messages.saturating_sub(max_messages);
3752 let window = total_messages - tail_start;
3753
3754 assert_eq!(
3755 window, 20,
3756 "slice should contain exactly max_messages items"
3757 );
3758 assert_eq!(tail_start, 80, "slice should start at len - max_messages");
3759 }
3760
3761 #[test]
3762 fn trajectory_extraction_slice_handles_few_messages() {
3763 let max_messages: usize = 20;
3764 let total_messages = 5usize;
3765
3766 let tail_start = total_messages.saturating_sub(max_messages);
3767 let window = total_messages - tail_start;
3768
3769 assert_eq!(window, 5, "should return all messages when fewer than max");
3770 assert_eq!(tail_start, 0, "slice should start from the beginning");
3771 }
3772
3773 #[tokio::test]
3779 async fn regression_3168_complete_tool_pair_survives_round_trip() {
3780 use zeph_llm::provider::MessagePart;
3781
3782 let provider = mock_provider(vec![]);
3783 let channel = MockChannel::new(vec![]);
3784 let registry = create_test_registry();
3785 let executor = MockToolExecutor::no_tools();
3786
3787 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
3788 let cid = memory.sqlite().create_conversation().await.unwrap();
3789 let sqlite = memory.sqlite();
3790
3791 let use_parts = serde_json::to_string(&[MessagePart::ToolUse {
3792 id: "r3168_call".to_string(),
3793 name: "shell".to_string(),
3794 input: serde_json::json!({"command": "echo hi"}),
3795 }])
3796 .unwrap();
3797 sqlite
3798 .save_message_with_parts(
3799 cid,
3800 "assistant",
3801 "[tool_use: shell(r3168_call)]",
3802 &use_parts,
3803 )
3804 .await
3805 .unwrap();
3806
3807 let result_parts = serde_json::to_string(&[MessagePart::ToolResult {
3808 tool_use_id: "r3168_call".to_string(),
3809 content: "[skipped]".to_string(),
3810 is_error: false,
3811 }])
3812 .unwrap();
3813 sqlite
3814 .save_message_with_parts(cid, "user", "[tool_result: r3168_call]", &result_parts)
3815 .await
3816 .unwrap();
3817
3818 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
3819 std::sync::Arc::new(memory),
3820 cid,
3821 50,
3822 5,
3823 100,
3824 );
3825
3826 let base = agent.msg.messages.len();
3827 agent.load_history().await.unwrap();
3828
3829 assert_eq!(
3830 agent.msg.messages.len(),
3831 base + 2,
3832 "both messages of the complete pair must survive load_history"
3833 );
3834
3835 let assistant_msg = agent
3836 .msg
3837 .messages
3838 .iter()
3839 .find(|m| m.role == Role::Assistant)
3840 .expect("assistant message missing after load_history");
3841 assert!(
3842 assistant_msg
3843 .parts
3844 .iter()
3845 .any(|p| matches!(p, MessagePart::ToolUse { id, .. } if id == "r3168_call")),
3846 "ToolUse part must be preserved in assistant message"
3847 );
3848
3849 let user_msg = agent
3850 .msg
3851 .messages
3852 .iter()
3853 .rev()
3854 .find(|m| m.role == Role::User)
3855 .expect("user message missing after load_history");
3856 assert!(
3857 user_msg.parts.iter().any(|p| matches!(p, MessagePart::ToolResult { tool_use_id, .. } if tool_use_id == "r3168_call")),
3858 "ToolResult part must be preserved in user message"
3859 );
3860 }
3861
3862 #[test]
3865 fn regression_3168_system_between_tool_pair_not_stripped() {
3866 use zeph_llm::provider::{MessageMetadata, MessagePart};
3867
3868 let tool_id = "call_sys_between".to_string();
3869
3870 let mut messages = vec![
3871 Message {
3872 role: Role::Assistant,
3873 content: "[tool_use: shell(call_sys_between)]".to_string(),
3874 parts: vec![MessagePart::ToolUse {
3875 id: tool_id.clone(),
3876 name: "shell".to_string(),
3877 input: serde_json::json!({"command": "ls"}),
3878 }],
3879 metadata: MessageMetadata::default(),
3880 },
3881 Message {
3882 role: Role::System,
3883 content: "system hint injected between tool calls".to_string(),
3884 parts: vec![],
3885 metadata: MessageMetadata::default(),
3886 },
3887 Message {
3888 role: Role::User,
3889 content: "[tool_result: call_sys_between]".to_string(),
3890 parts: vec![MessagePart::ToolResult {
3891 tool_use_id: tool_id.clone(),
3892 content: "output".to_string(),
3893 is_error: false,
3894 }],
3895 metadata: MessageMetadata::default(),
3896 },
3897 ];
3898
3899 let (removed, _db_ids) = strip_mid_history_orphans(&mut messages);
3900
3901 assert_eq!(
3902 removed, 0,
3903 "no messages should be removed when System sits between a matched tool_use/tool_result pair"
3904 );
3905 assert_eq!(messages.len(), 3, "all three messages must remain");
3906 assert!(
3907 messages[0]
3908 .parts
3909 .iter()
3910 .any(|p| matches!(p, MessagePart::ToolUse { id, .. } if id == &tool_id)),
3911 "ToolUse part must not be stripped from assistant message"
3912 );
3913 }
3914
3915 #[tokio::test]
3920 async fn regression_3168_corrupt_parts_row_skipped_on_load() {
3921 use zeph_llm::provider::MessagePart;
3922
3923 let provider = mock_provider(vec![]);
3924 let channel = MockChannel::new(vec![]);
3925 let registry = create_test_registry();
3926 let executor = MockToolExecutor::no_tools();
3927
3928 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
3929 let cid = memory.sqlite().create_conversation().await.unwrap();
3930 let sqlite = memory.sqlite();
3931
3932 sqlite
3936 .save_message_with_parts(cid, "assistant", "[tool_use: shell(corrupt)]", "[]")
3937 .await
3938 .unwrap();
3939
3940 let result_parts = serde_json::to_string(&[MessagePart::ToolResult {
3942 tool_use_id: "corrupt".to_string(),
3943 content: "result".to_string(),
3944 is_error: false,
3945 }])
3946 .unwrap();
3947 sqlite
3948 .save_message_with_parts(cid, "user", "[tool_result: corrupt]", &result_parts)
3949 .await
3950 .unwrap();
3951
3952 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
3953 std::sync::Arc::new(memory),
3954 cid,
3955 50,
3956 5,
3957 100,
3958 );
3959
3960 let base = agent.msg.messages.len();
3961 agent.load_history().await.unwrap();
3962
3963 let loaded = agent.msg.messages.len() - base;
3968 let orphan_present = agent.msg.messages.iter().any(|m| {
3971 m.role == Role::User
3972 && m.parts.iter().any(|p| {
3973 matches!(p, MessagePart::ToolResult { tool_use_id, .. } if tool_use_id == "corrupt")
3974 })
3975 });
3976 assert!(
3977 !orphan_present,
3978 "orphaned ToolResult must not survive load_history; loaded={loaded}"
3979 );
3980 }
3981}