1use std::collections::HashSet;
5
6use crate::channel::Channel;
7use zeph_llm::provider::{LlmProvider as _, Message, MessagePart, Role};
8use zeph_memory::store::role_str;
9
10use super::Agent;
11
12fn sanitize_tool_pairs(messages: &mut Vec<Message>) -> (usize, Vec<i64>) {
28 let mut removed = 0;
29 let mut db_ids: Vec<i64> = Vec::new();
30
31 loop {
32 if let Some(last) = messages.last()
34 && last.role == Role::Assistant
35 && last
36 .parts
37 .iter()
38 .any(|p| matches!(p, MessagePart::ToolUse { .. }))
39 {
40 let ids: Vec<String> = last
41 .parts
42 .iter()
43 .filter_map(|p| {
44 if let MessagePart::ToolUse { id, .. } = p {
45 Some(id.clone())
46 } else {
47 None
48 }
49 })
50 .collect();
51 tracing::warn!(
52 tool_ids = ?ids,
53 "removing orphaned trailing tool_use message from restored history"
54 );
55 if let Some(db_id) = messages.last().and_then(|m| m.metadata.db_id) {
56 db_ids.push(db_id);
57 }
58 messages.pop();
59 removed += 1;
60 continue;
61 }
62
63 if let Some(first) = messages.first()
65 && first.role == Role::User
66 && first
67 .parts
68 .iter()
69 .any(|p| matches!(p, MessagePart::ToolResult { .. }))
70 {
71 let ids: Vec<String> = first
72 .parts
73 .iter()
74 .filter_map(|p| {
75 if let MessagePart::ToolResult { tool_use_id, .. } = p {
76 Some(tool_use_id.clone())
77 } else {
78 None
79 }
80 })
81 .collect();
82 tracing::warn!(
83 tool_use_ids = ?ids,
84 "removing orphaned leading tool_result message from restored history"
85 );
86 if let Some(db_id) = messages.first().and_then(|m| m.metadata.db_id) {
87 db_ids.push(db_id);
88 }
89 messages.remove(0);
90 removed += 1;
91 continue;
92 }
93
94 break;
95 }
96
97 let (mid_removed, mid_db_ids) = strip_mid_history_orphans(messages);
100 removed += mid_removed;
101 db_ids.extend(mid_db_ids);
102
103 (removed, db_ids)
104}
105
106fn orphaned_tool_use_ids(msg: &Message, next_msg: Option<&Message>) -> HashSet<String> {
108 let matched: HashSet<String> = next_msg
109 .filter(|n| n.role == Role::User)
110 .map(|n| {
111 msg.parts
112 .iter()
113 .filter_map(|p| if let MessagePart::ToolUse { id, .. } = p { Some(id.clone()) } else { None })
114 .filter(|uid| n.parts.iter().any(|np| matches!(np, MessagePart::ToolResult { tool_use_id, .. } if tool_use_id == uid)))
115 .collect()
116 })
117 .unwrap_or_default();
118 msg.parts
119 .iter()
120 .filter_map(|p| {
121 if let MessagePart::ToolUse { id, .. } = p
122 && !matched.contains(id)
123 {
124 Some(id.clone())
125 } else {
126 None
127 }
128 })
129 .collect()
130}
131
132fn orphaned_tool_result_ids(msg: &Message, prev_msg: Option<&Message>) -> HashSet<String> {
134 let avail: HashSet<&str> = prev_msg
135 .filter(|p| p.role == Role::Assistant)
136 .map(|p| {
137 p.parts
138 .iter()
139 .filter_map(|part| {
140 if let MessagePart::ToolUse { id, .. } = part {
141 Some(id.as_str())
142 } else {
143 None
144 }
145 })
146 .collect()
147 })
148 .unwrap_or_default();
149 msg.parts
150 .iter()
151 .filter_map(|p| {
152 if let MessagePart::ToolResult { tool_use_id, .. } = p
153 && !avail.contains(tool_use_id.as_str())
154 {
155 Some(tool_use_id.clone())
156 } else {
157 None
158 }
159 })
160 .collect()
161}
162
163fn has_meaningful_content(content: &str) -> bool {
184 const PREFIXES: [&str; 3] = ["[tool_use: ", "[tool_result: ", "[tool output: "];
185
186 let mut remaining = content.trim();
187
188 loop {
189 let next = PREFIXES
191 .iter()
192 .filter_map(|prefix| remaining.find(prefix).map(|pos| (pos, *prefix)))
193 .min_by_key(|(pos, _)| *pos);
194
195 let Some((start, prefix)) = next else {
196 break;
198 };
199
200 if !remaining[..start].trim().is_empty() {
202 return true;
203 }
204
205 let after_prefix = &remaining[start + prefix.len()..];
207 let Some(close) = after_prefix.find(']') else {
208 return true;
210 };
211
212 let tag_end = start + prefix.len() + close + 1;
214
215 if prefix == "[tool_result: " || prefix == "[tool output: " {
216 let body = remaining[tag_end..].trim_start_matches('\n');
219 let next_tag = PREFIXES
220 .iter()
221 .filter_map(|p| body.find(p))
222 .min()
223 .unwrap_or(body.len());
224 remaining = &body[next_tag..];
225 } else {
226 remaining = &remaining[tag_end..];
227 }
228 }
229
230 !remaining.trim().is_empty()
231}
232
233fn strip_mid_history_orphans(messages: &mut Vec<Message>) -> (usize, Vec<i64>) {
246 let mut removed = 0;
247 let mut db_ids: Vec<i64> = Vec::new();
248 let mut i = 0;
249 while i < messages.len() {
250 if messages[i].role == Role::Assistant
254 && messages[i]
255 .parts
256 .iter()
257 .any(|p| matches!(p, MessagePart::ToolUse { .. }))
258 {
259 let orphaned_ids = orphaned_tool_use_ids(&messages[i], messages.get(i + 1));
260 if !orphaned_ids.is_empty() {
261 tracing::warn!(
262 tool_ids = ?orphaned_ids,
263 index = i,
264 "stripping orphaned mid-history tool_use parts from assistant message"
265 );
266 messages[i].parts.retain(
267 |p| !matches!(p, MessagePart::ToolUse { id, .. } if orphaned_ids.contains(id)),
268 );
269 let is_empty =
270 !has_meaningful_content(&messages[i].content) && messages[i].parts.is_empty();
271 if is_empty {
272 if let Some(db_id) = messages[i].metadata.db_id {
273 db_ids.push(db_id);
274 }
275 messages.remove(i);
276 removed += 1;
277 continue; }
279 }
280 }
281
282 if messages[i].role == Role::User
284 && messages[i]
285 .parts
286 .iter()
287 .any(|p| matches!(p, MessagePart::ToolResult { .. }))
288 {
289 let orphaned_ids = orphaned_tool_result_ids(
290 &messages[i],
291 if i > 0 { messages.get(i - 1) } else { None },
292 );
293 if !orphaned_ids.is_empty() {
294 tracing::warn!(
295 tool_use_ids = ?orphaned_ids,
296 index = i,
297 "stripping orphaned mid-history tool_result parts from user message"
298 );
299 messages[i].parts.retain(|p| {
300 !matches!(p, MessagePart::ToolResult { tool_use_id, .. } if orphaned_ids.contains(tool_use_id.as_str()))
301 });
302
303 let is_empty =
304 !has_meaningful_content(&messages[i].content) && messages[i].parts.is_empty();
305 if is_empty {
306 if let Some(db_id) = messages[i].metadata.db_id {
307 db_ids.push(db_id);
308 }
309 messages.remove(i);
310 removed += 1;
311 continue;
313 }
314 }
315 }
316
317 i += 1;
318 }
319 (removed, db_ids)
320}
321
322impl<C: Channel> Agent<C> {
323 pub async fn load_history(&mut self) -> Result<(), super::error::AgentError> {
329 let (Some(memory), Some(cid)) = (
330 &self.memory_state.persistence.memory,
331 self.memory_state.persistence.conversation_id,
332 ) else {
333 return Ok(());
334 };
335
336 let history = memory
337 .sqlite()
338 .load_history_filtered(
339 cid,
340 self.memory_state.persistence.history_limit,
341 Some(true),
342 None,
343 )
344 .await?;
345 if !history.is_empty() {
346 let mut loaded = 0;
347 let mut skipped = 0;
348
349 for msg in history {
350 if !has_meaningful_content(&msg.content) && msg.parts.is_empty() {
355 tracing::warn!("skipping empty message from history (role: {:?})", msg.role);
356 skipped += 1;
357 continue;
358 }
359 self.msg.messages.push(msg);
360 loaded += 1;
361 }
362
363 let history_start = self.msg.messages.len() - loaded;
365 let mut restored_slice = self.msg.messages.split_off(history_start);
366 let (orphans, orphan_db_ids) = sanitize_tool_pairs(&mut restored_slice);
367 skipped += orphans;
368 loaded = loaded.saturating_sub(orphans);
369 self.msg.messages.append(&mut restored_slice);
370
371 if !orphan_db_ids.is_empty() {
372 let ids: Vec<zeph_memory::types::MessageId> = orphan_db_ids
373 .iter()
374 .map(|&id| zeph_memory::types::MessageId(id))
375 .collect();
376 if let Err(e) = memory.sqlite().soft_delete_messages(&ids).await {
377 tracing::warn!(
378 count = ids.len(),
379 error = %e,
380 "failed to soft-delete orphaned tool-pair messages from DB"
381 );
382 } else {
383 tracing::debug!(
384 count = ids.len(),
385 "soft-deleted orphaned tool-pair messages from DB"
386 );
387 }
388 }
389
390 tracing::info!("restored {loaded} message(s) from conversation {cid}");
391 if skipped > 0 {
392 tracing::warn!("skipped {skipped} empty/orphaned message(s) from history");
393 }
394
395 if loaded > 0 {
396 let _ = memory
399 .sqlite()
400 .increment_session_counts_for_conversation(cid)
401 .await
402 .inspect_err(|e| {
403 tracing::warn!(error = %e, "failed to increment tier session counts");
404 });
405 }
406 }
407
408 if let Ok(count) = memory.message_count(cid).await {
409 let count_u64 = u64::try_from(count).unwrap_or(0);
410 self.update_metrics(|m| {
411 m.sqlite_message_count = count_u64;
412 });
413 }
414
415 if let Ok(count) = memory.sqlite().count_semantic_facts().await {
416 let count_u64 = u64::try_from(count).unwrap_or(0);
417 self.update_metrics(|m| {
418 m.semantic_fact_count = count_u64;
419 });
420 }
421
422 if let Ok(count) = memory.unsummarized_message_count(cid).await {
423 self.memory_state.persistence.unsummarized_count = usize::try_from(count).unwrap_or(0);
424 }
425
426 self.recompute_prompt_tokens();
427 Ok(())
428 }
429
430 #[allow(clippy::too_many_lines)]
436 #[cfg_attr(
437 feature = "profiling",
438 tracing::instrument(name = "agent.persist_message", skip_all)
439 )]
440 pub(crate) async fn persist_message(
441 &mut self,
442 role: Role,
443 content: &str,
444 parts: &[MessagePart],
445 has_injection_flags: bool,
446 ) {
447 let (Some(memory), Some(cid)) = (
448 &self.memory_state.persistence.memory,
449 self.memory_state.persistence.conversation_id,
450 ) else {
451 return;
452 };
453
454 let parts_json = if parts.is_empty() {
455 "[]".to_string()
456 } else {
457 serde_json::to_string(parts).unwrap_or_else(|e| {
458 tracing::warn!("failed to serialize message parts, storing empty: {e}");
459 "[]".to_string()
460 })
461 };
462
463 let guard_event = self
467 .security
468 .exfiltration_guard
469 .should_guard_memory_write(has_injection_flags);
470 if let Some(ref event) = guard_event {
471 tracing::warn!(
472 ?event,
473 "exfiltration guard: skipping Qdrant embedding for flagged content"
474 );
475 self.update_metrics(|m| m.exfiltration_memory_guards += 1);
476 self.push_security_event(
477 crate::metrics::SecurityEventCategory::ExfiltrationBlock,
478 "memory_write",
479 "Qdrant embedding skipped: flagged content",
480 );
481 }
482
483 let skip_embedding = guard_event.is_some();
484
485 let has_skipped_tool_result = parts.iter().any(|p| {
489 if let MessagePart::ToolResult { content, .. } = p {
490 content.starts_with("[skipped]") || content.starts_with("[stopped]")
491 } else {
492 false
493 }
494 });
495
496 let should_embed = if skip_embedding || has_skipped_tool_result {
497 false
498 } else {
499 match role {
500 Role::Assistant => {
501 self.memory_state.persistence.autosave_assistant
502 && content.len() >= self.memory_state.persistence.autosave_min_length
503 }
504 _ => true,
505 }
506 };
507
508 let goal_text = self.memory_state.extraction.goal_text.clone();
509
510 tracing::debug!(
511 "persist_message: calling remember_with_parts, embed dispatched to background"
512 );
513 let (embedding_stored, was_persisted) = if should_embed {
514 match memory
515 .remember_with_parts(
516 cid,
517 role_str(role),
518 content,
519 &parts_json,
520 goal_text.as_deref(),
521 )
522 .await
523 {
524 Ok((Some(message_id), stored)) => {
525 self.msg.last_persisted_message_id = Some(message_id.0);
526 (stored, true)
527 }
528 Ok((None, _)) => {
529 return;
531 }
532 Err(e) => {
533 tracing::error!("failed to persist message: {e:#}");
534 return;
535 }
536 }
537 } else {
538 match memory
539 .save_only(cid, role_str(role), content, &parts_json)
540 .await
541 {
542 Ok(message_id) => {
543 self.msg.last_persisted_message_id = Some(message_id.0);
544 (false, true)
545 }
546 Err(e) => {
547 tracing::error!("failed to persist message: {e:#}");
548 return;
549 }
550 }
551 };
552
553 if !was_persisted {
554 return;
555 }
556
557 self.memory_state.persistence.unsummarized_count += 1;
558
559 self.update_metrics(|m| {
560 m.sqlite_message_count += 1;
561 if embedding_stored {
562 m.embeddings_generated += 1;
563 }
564 });
565
566 tracing::debug!("persist_message: db insert complete, embedding running in background");
567 memory.reap_embed_tasks();
568
569 self.enqueue_summarization_task();
573
574 let has_tool_result_parts = parts
577 .iter()
578 .any(|p| matches!(p, MessagePart::ToolResult { .. }));
579
580 self.enqueue_graph_extraction_task(content, has_injection_flags, has_tool_result_parts)
581 .await;
582
583 if role == Role::User && !has_tool_result_parts && !has_injection_flags {
585 self.enqueue_persona_extraction_task();
586 }
587
588 if has_tool_result_parts {
590 self.enqueue_trajectory_extraction_task();
591 }
592 }
593
594 fn enqueue_summarization_task(&mut self) {
596 let (Some(memory), Some(cid)) = (
597 self.memory_state.persistence.memory.clone(),
598 self.memory_state.persistence.conversation_id,
599 ) else {
600 return;
601 };
602
603 if self.memory_state.persistence.unsummarized_count
604 <= self.memory_state.compaction.summarization_threshold
605 {
606 return;
607 }
608
609 let batch_size = self.memory_state.compaction.summarization_threshold / 2;
610
611 self.lifecycle.supervisor.spawn_summarization("summarization", async move {
612 match tokio::time::timeout(
613 std::time::Duration::from_secs(30),
614 memory.summarize(cid, batch_size),
615 )
616 .await
617 {
618 Ok(Ok(Some(summary_id))) => {
619 tracing::info!(
620 "background summarization: created summary {summary_id} for conversation {cid}"
621 );
622 true
623 }
624 Ok(Ok(None)) => {
625 tracing::debug!("background summarization: no summarization needed");
626 false
627 }
628 Ok(Err(e)) => {
629 tracing::error!("background summarization failed: {e:#}");
630 false
631 }
632 Err(_) => {
633 tracing::warn!("background summarization timed out after 30s");
634 false
635 }
636 }
637 });
638 }
639
640 #[allow(clippy::too_many_lines)]
645 async fn enqueue_graph_extraction_task(
646 &mut self,
647 content: &str,
648 has_injection_flags: bool,
649 has_tool_result_parts: bool,
650 ) {
651 use zeph_memory::semantic::GraphExtractionConfig;
652
653 if self.memory_state.persistence.memory.is_none()
654 || self.memory_state.persistence.conversation_id.is_none()
655 {
656 return;
657 }
658 if has_tool_result_parts {
659 tracing::debug!("graph extraction skipped: message contains ToolResult parts");
660 return;
661 }
662 if has_injection_flags {
663 tracing::warn!("graph extraction skipped: injection patterns detected in content");
664 return;
665 }
666
667 let extraction_cfg = {
668 let cfg = &self.memory_state.extraction.graph_config;
669 if !cfg.enabled {
670 return;
671 }
672 GraphExtractionConfig {
673 max_entities: cfg.max_entities_per_message,
674 max_edges: cfg.max_edges_per_message,
675 extraction_timeout_secs: cfg.extraction_timeout_secs,
676 community_refresh_interval: cfg.community_refresh_interval,
677 expired_edge_retention_days: cfg.expired_edge_retention_days,
678 max_entities_cap: cfg.max_entities,
679 community_summary_max_prompt_bytes: cfg.community_summary_max_prompt_bytes,
680 community_summary_concurrency: cfg.community_summary_concurrency,
681 lpa_edge_chunk_size: cfg.lpa_edge_chunk_size,
682 note_linking: zeph_memory::NoteLinkingConfig {
683 enabled: cfg.note_linking.enabled,
684 similarity_threshold: cfg.note_linking.similarity_threshold,
685 top_k: cfg.note_linking.top_k,
686 timeout_secs: cfg.note_linking.timeout_secs,
687 },
688 link_weight_decay_lambda: cfg.link_weight_decay_lambda,
689 link_weight_decay_interval_secs: cfg.link_weight_decay_interval_secs,
690 belief_revision_enabled: cfg.belief_revision.enabled,
691 belief_revision_similarity_threshold: cfg.belief_revision.similarity_threshold,
692 conversation_id: self.memory_state.persistence.conversation_id.map(|c| c.0),
693 }
694 };
695
696 if self.rpe_should_skip(content).await {
699 tracing::debug!("D-MEM RPE: low-surprise turn, skipping graph extraction");
700 return;
701 }
702
703 let context_messages: Vec<String> = self
704 .msg
705 .messages
706 .iter()
707 .rev()
708 .filter(|m| {
709 m.role == Role::User
710 && !m
711 .parts
712 .iter()
713 .any(|p| matches!(p, MessagePart::ToolResult { .. }))
714 })
715 .take(4)
716 .map(|m| {
717 if m.content.len() > 2048 {
718 m.content[..m.content.floor_char_boundary(2048)].to_owned()
719 } else {
720 m.content.clone()
721 }
722 })
723 .collect();
724
725 let Some(memory) = self.memory_state.persistence.memory.clone() else {
726 return;
727 };
728
729 let validator: zeph_memory::semantic::PostExtractValidator =
730 if self.security.memory_validator.is_enabled() {
731 let v = self.security.memory_validator.clone();
732 Some(Box::new(move |result| {
733 v.validate_graph_extraction(result)
734 .map_err(|e| e.to_string())
735 }))
736 } else {
737 None
738 };
739
740 let content_owned = content.to_owned();
741 let graph_store = memory.graph_store.clone();
742 let metrics_tx = self.metrics.metrics_tx.clone();
743 let start_time = self.lifecycle.start_time;
744
745 self.lifecycle.supervisor.spawn(
746 super::agent_supervisor::TaskClass::Enrichment,
747 "graph_extraction",
748 async move {
749 let extraction_handle = memory.spawn_graph_extraction(
750 content_owned,
751 context_messages,
752 extraction_cfg,
753 validator,
754 );
755
756 if let (Some(store), Some(tx)) = (graph_store, metrics_tx) {
758 let _ = extraction_handle.await;
759 let (entities, edges, communities) = tokio::join!(
760 store.entity_count(),
761 store.active_edge_count(),
762 store.community_count()
763 );
764 let elapsed = start_time.elapsed().as_secs();
765 tx.send_modify(|m| {
766 m.uptime_seconds = elapsed;
767 m.graph_entities_total = entities.unwrap_or(0).cast_unsigned();
768 m.graph_edges_total = edges.unwrap_or(0).cast_unsigned();
769 m.graph_communities_total = communities.unwrap_or(0).cast_unsigned();
770 });
771 } else {
772 let _ = extraction_handle.await;
773 }
774
775 tracing::debug!("background graph extraction complete");
776 },
777 );
778
779 self.sync_community_detection_failures();
781 self.sync_graph_extraction_metrics();
782 let memory_for_sync = self.memory_state.persistence.memory.clone();
784 let metrics_tx_sync = self.metrics.metrics_tx.clone();
785 let start_time_sync = self.lifecycle.start_time;
786 let cid_sync = self.memory_state.persistence.conversation_id;
787 let graph_store_sync = memory_for_sync.as_ref().and_then(|m| m.graph_store.clone());
788 let sqlite_sync = memory_for_sync.as_ref().map(|m| m.sqlite().clone());
789 let guidelines_enabled = self.memory_state.extraction.graph_config.enabled;
790
791 self.lifecycle.supervisor.spawn(
792 super::agent_supervisor::TaskClass::Telemetry,
793 "graph_count_sync",
794 async move {
795 let Some(store) = graph_store_sync else {
796 return;
797 };
798 let Some(tx) = metrics_tx_sync else { return };
799
800 let (entities, edges, communities) = tokio::join!(
801 store.entity_count(),
802 store.active_edge_count(),
803 store.community_count()
804 );
805 let elapsed = start_time_sync.elapsed().as_secs();
806 tx.send_modify(|m| {
807 m.uptime_seconds = elapsed;
808 m.graph_entities_total = entities.unwrap_or(0).cast_unsigned();
809 m.graph_edges_total = edges.unwrap_or(0).cast_unsigned();
810 m.graph_communities_total = communities.unwrap_or(0).cast_unsigned();
811 });
812
813 if guidelines_enabled && let Some(sqlite) = sqlite_sync {
815 match tokio::time::timeout(
816 std::time::Duration::from_secs(10),
817 sqlite.load_compression_guidelines_meta(cid_sync),
818 )
819 .await
820 {
821 Ok(Ok((version, created_at))) => {
822 #[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
823 let version_u32 = u32::try_from(version).unwrap_or(0);
824 tx.send_modify(|m| {
825 m.guidelines_version = version_u32;
826 m.guidelines_updated_at = created_at;
827 });
828 }
829 Ok(Err(e)) => {
830 tracing::debug!("guidelines status sync failed: {e:#}");
831 }
832 Err(_) => {
833 tracing::debug!("guidelines status sync timed out");
834 }
835 }
836 }
837 },
838 );
839 }
840
841 fn enqueue_persona_extraction_task(&mut self) {
843 use zeph_memory::semantic::{PersonaExtractionConfig, extract_persona_facts};
844
845 let cfg = &self.memory_state.extraction.persona_config;
846 if !cfg.enabled {
847 return;
848 }
849
850 let Some(memory) = &self.memory_state.persistence.memory else {
851 return;
852 };
853
854 let user_messages: Vec<String> = self
855 .msg
856 .messages
857 .iter()
858 .filter(|m| {
859 m.role == Role::User
860 && !m
861 .parts
862 .iter()
863 .any(|p| matches!(p, MessagePart::ToolResult { .. }))
864 })
865 .take(8)
866 .map(|m| {
867 if m.content.len() > 2048 {
868 m.content[..m.content.floor_char_boundary(2048)].to_owned()
869 } else {
870 m.content.clone()
871 }
872 })
873 .collect();
874
875 if user_messages.len() < cfg.min_messages {
876 return;
877 }
878
879 let timeout_secs = cfg.extraction_timeout_secs;
880 let extraction_cfg = PersonaExtractionConfig {
881 enabled: cfg.enabled,
882 min_messages: cfg.min_messages,
883 max_messages: cfg.max_messages,
884 extraction_timeout_secs: timeout_secs,
885 };
886
887 let provider = self.resolve_background_provider(cfg.persona_provider.as_str());
888 let store = memory.sqlite().clone();
889 let conversation_id = self.memory_state.persistence.conversation_id.map(|c| c.0);
890
891 self.lifecycle.supervisor.spawn(
892 super::agent_supervisor::TaskClass::Enrichment,
893 "persona_extraction",
894 async move {
895 let user_message_refs: Vec<&str> =
896 user_messages.iter().map(String::as_str).collect();
897 let fut = extract_persona_facts(
898 &store,
899 &provider,
900 &user_message_refs,
901 &extraction_cfg,
902 conversation_id,
903 );
904 match tokio::time::timeout(std::time::Duration::from_secs(timeout_secs), fut).await
905 {
906 Ok(Ok(n)) => tracing::debug!(upserted = n, "persona extraction complete"),
907 Ok(Err(e)) => tracing::warn!(error = %e, "persona extraction failed"),
908 Err(_) => tracing::warn!(
909 timeout_secs,
910 "persona extraction timed out — no facts written this turn"
911 ),
912 }
913 },
914 );
915 }
916
917 fn enqueue_trajectory_extraction_task(&mut self) {
919 use zeph_memory::semantic::{TrajectoryExtractionConfig, extract_trajectory_entries};
920
921 let cfg = self.memory_state.extraction.trajectory_config.clone();
922 if !cfg.enabled {
923 return;
924 }
925
926 let Some(memory) = &self.memory_state.persistence.memory else {
927 return;
928 };
929
930 let conversation_id = match self.memory_state.persistence.conversation_id {
931 Some(cid) => cid.0,
932 None => return,
933 };
934
935 let tail_start = self.msg.messages.len().saturating_sub(cfg.max_messages);
936 let turn_messages: Vec<zeph_llm::provider::Message> =
937 self.msg.messages[tail_start..].to_vec();
938
939 if turn_messages.is_empty() {
940 return;
941 }
942
943 let extraction_cfg = TrajectoryExtractionConfig {
944 enabled: cfg.enabled,
945 max_messages: cfg.max_messages,
946 extraction_timeout_secs: cfg.extraction_timeout_secs,
947 };
948
949 let provider = self.resolve_background_provider(cfg.trajectory_provider.as_str());
950 let store = memory.sqlite().clone();
951 let min_confidence = cfg.min_confidence;
952
953 self.lifecycle.supervisor.spawn(
954 super::agent_supervisor::TaskClass::Enrichment,
955 "trajectory_extraction",
956 async move {
957 let entries =
958 match extract_trajectory_entries(&provider, &turn_messages, &extraction_cfg)
959 .await
960 {
961 Ok(e) => e,
962 Err(e) => {
963 tracing::warn!(error = %e, "trajectory extraction failed");
964 return;
965 }
966 };
967
968 let last_id = store
969 .trajectory_last_extracted_message_id(conversation_id)
970 .await
971 .unwrap_or(0);
972
973 let mut max_id = last_id;
974 for entry in &entries {
975 if entry.confidence < min_confidence {
976 continue;
977 }
978 let tools_json = serde_json::to_string(&entry.tools_used)
979 .unwrap_or_else(|_| "[]".to_string());
980 match store
981 .insert_trajectory_entry(zeph_memory::NewTrajectoryEntry {
982 conversation_id: Some(conversation_id),
983 turn_index: 0,
984 kind: &entry.kind,
985 intent: &entry.intent,
986 outcome: &entry.outcome,
987 tools_used: &tools_json,
988 confidence: entry.confidence,
989 })
990 .await
991 {
992 Ok(id) => {
993 if id > max_id {
994 max_id = id;
995 }
996 }
997 Err(e) => tracing::warn!(error = %e, "failed to insert trajectory entry"),
998 }
999 }
1000
1001 if max_id > last_id {
1002 let _ = store
1003 .set_trajectory_last_extracted_message_id(conversation_id, max_id)
1004 .await;
1005 }
1006
1007 tracing::debug!(
1008 count = entries.len(),
1009 conversation_id,
1010 "trajectory extraction complete"
1011 );
1012 },
1013 );
1014 }
1015
1016 async fn rpe_should_skip(&mut self, content: &str) -> bool {
1021 let Some(ref rpe_mutex) = self.memory_state.extraction.rpe_router else {
1022 return false;
1023 };
1024 let Some(memory) = &self.memory_state.persistence.memory else {
1025 return false;
1026 };
1027 let candidates = zeph_memory::extract_candidate_entities(content);
1028 let provider = memory.provider();
1029 let Ok(Ok(emb_vec)) =
1030 tokio::time::timeout(std::time::Duration::from_secs(5), provider.embed(content)).await
1031 else {
1032 return false; };
1034 if let Ok(mut router) = rpe_mutex.lock() {
1035 let signal = router.compute(&emb_vec, &candidates);
1036 router.push_embedding(emb_vec);
1037 router.push_entities(&candidates);
1038 !signal.should_extract
1039 } else {
1040 tracing::warn!("rpe_router mutex poisoned; falling through to extract");
1041 false
1042 }
1043 }
1044}
1045
1046#[cfg(test)]
1047mod tests {
1048 use super::super::agent_tests::{
1049 MetricsSnapshot, MockChannel, MockToolExecutor, create_test_registry, mock_provider,
1050 };
1051 use super::*;
1052 use zeph_llm::any::AnyProvider;
1053 use zeph_memory::semantic::SemanticMemory;
1054
1055 async fn test_memory(provider: &AnyProvider) -> SemanticMemory {
1056 SemanticMemory::new(
1057 ":memory:",
1058 "http://127.0.0.1:1",
1059 provider.clone(),
1060 "test-model",
1061 )
1062 .await
1063 .unwrap()
1064 }
1065
1066 #[tokio::test]
1067 async fn load_history_without_memory_returns_ok() {
1068 let provider = mock_provider(vec![]);
1069 let channel = MockChannel::new(vec![]);
1070 let registry = create_test_registry();
1071 let executor = MockToolExecutor::no_tools();
1072 let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
1073
1074 let result = agent.load_history().await;
1075 assert!(result.is_ok());
1076 assert_eq!(agent.msg.messages.len(), 1); }
1079
1080 #[tokio::test]
1081 async fn load_history_with_messages_injects_into_agent() {
1082 let provider = mock_provider(vec![]);
1083 let channel = MockChannel::new(vec![]);
1084 let registry = create_test_registry();
1085 let executor = MockToolExecutor::no_tools();
1086
1087 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1088 let cid = memory.sqlite().create_conversation().await.unwrap();
1089
1090 memory
1091 .sqlite()
1092 .save_message(cid, "user", "hello from history")
1093 .await
1094 .unwrap();
1095 memory
1096 .sqlite()
1097 .save_message(cid, "assistant", "hi back")
1098 .await
1099 .unwrap();
1100
1101 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
1102 std::sync::Arc::new(memory),
1103 cid,
1104 50,
1105 5,
1106 100,
1107 );
1108
1109 let messages_before = agent.msg.messages.len();
1110 agent.load_history().await.unwrap();
1111 assert_eq!(agent.msg.messages.len(), messages_before + 2);
1113 }
1114
1115 #[tokio::test]
1116 async fn load_history_skips_empty_messages() {
1117 let provider = mock_provider(vec![]);
1118 let channel = MockChannel::new(vec![]);
1119 let registry = create_test_registry();
1120 let executor = MockToolExecutor::no_tools();
1121
1122 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1123 let cid = memory.sqlite().create_conversation().await.unwrap();
1124
1125 memory
1127 .sqlite()
1128 .save_message(cid, "user", " ")
1129 .await
1130 .unwrap();
1131 memory
1132 .sqlite()
1133 .save_message(cid, "user", "real message")
1134 .await
1135 .unwrap();
1136
1137 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
1138 std::sync::Arc::new(memory),
1139 cid,
1140 50,
1141 5,
1142 100,
1143 );
1144
1145 let messages_before = agent.msg.messages.len();
1146 agent.load_history().await.unwrap();
1147 assert_eq!(agent.msg.messages.len(), messages_before + 1);
1149 }
1150
1151 #[tokio::test]
1152 async fn load_history_with_empty_store_returns_ok() {
1153 let provider = mock_provider(vec![]);
1154 let channel = MockChannel::new(vec![]);
1155 let registry = create_test_registry();
1156 let executor = MockToolExecutor::no_tools();
1157
1158 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1159 let cid = memory.sqlite().create_conversation().await.unwrap();
1160
1161 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
1162 std::sync::Arc::new(memory),
1163 cid,
1164 50,
1165 5,
1166 100,
1167 );
1168
1169 let messages_before = agent.msg.messages.len();
1170 agent.load_history().await.unwrap();
1171 assert_eq!(agent.msg.messages.len(), messages_before);
1173 }
1174
1175 #[tokio::test]
1176 async fn load_history_increments_session_count_for_existing_messages() {
1177 let provider = mock_provider(vec![]);
1178 let channel = MockChannel::new(vec![]);
1179 let registry = create_test_registry();
1180 let executor = MockToolExecutor::no_tools();
1181
1182 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1183 let cid = memory.sqlite().create_conversation().await.unwrap();
1184
1185 let id1 = memory
1187 .sqlite()
1188 .save_message(cid, "user", "hello")
1189 .await
1190 .unwrap();
1191 let id2 = memory
1192 .sqlite()
1193 .save_message(cid, "assistant", "hi")
1194 .await
1195 .unwrap();
1196
1197 let memory_arc = std::sync::Arc::new(memory);
1198 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
1199 memory_arc.clone(),
1200 cid,
1201 50,
1202 5,
1203 100,
1204 );
1205
1206 agent.load_history().await.unwrap();
1207
1208 let counts: Vec<i64> = zeph_db::query_scalar(
1210 "SELECT session_count FROM messages WHERE id IN (?, ?) ORDER BY id",
1211 )
1212 .bind(id1)
1213 .bind(id2)
1214 .fetch_all(memory_arc.sqlite().pool())
1215 .await
1216 .unwrap();
1217 assert_eq!(
1218 counts,
1219 vec![1, 1],
1220 "session_count must be 1 after first restore"
1221 );
1222 }
1223
1224 #[tokio::test]
1225 async fn load_history_does_not_increment_session_count_for_new_conversation() {
1226 let provider = mock_provider(vec![]);
1227 let channel = MockChannel::new(vec![]);
1228 let registry = create_test_registry();
1229 let executor = MockToolExecutor::no_tools();
1230
1231 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1232 let cid = memory.sqlite().create_conversation().await.unwrap();
1233
1234 let memory_arc = std::sync::Arc::new(memory);
1236 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
1237 memory_arc.clone(),
1238 cid,
1239 50,
1240 5,
1241 100,
1242 );
1243
1244 agent.load_history().await.unwrap();
1245
1246 let counts: Vec<i64> =
1248 zeph_db::query_scalar("SELECT session_count FROM messages WHERE conversation_id = ?")
1249 .bind(cid)
1250 .fetch_all(memory_arc.sqlite().pool())
1251 .await
1252 .unwrap();
1253 assert!(counts.is_empty(), "new conversation must have no messages");
1254 }
1255
1256 #[tokio::test]
1257 async fn persist_message_without_memory_silently_returns() {
1258 let provider = mock_provider(vec![]);
1260 let channel = MockChannel::new(vec![]);
1261 let registry = create_test_registry();
1262 let executor = MockToolExecutor::no_tools();
1263 let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
1264
1265 agent.persist_message(Role::User, "hello", &[], false).await;
1267 }
1268
1269 #[tokio::test]
1270 async fn persist_message_assistant_autosave_false_uses_save_only() {
1271 let provider = mock_provider(vec![]);
1272 let channel = MockChannel::new(vec![]);
1273 let registry = create_test_registry();
1274 let executor = MockToolExecutor::no_tools();
1275
1276 let (tx, rx) = tokio::sync::watch::channel(MetricsSnapshot::default());
1277 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1278 let cid = memory.sqlite().create_conversation().await.unwrap();
1279
1280 let mut agent = Agent::new(provider, channel, registry, None, 5, executor)
1281 .with_metrics(tx)
1282 .with_memory(std::sync::Arc::new(memory), cid, 50, 5, 100);
1283 agent.memory_state.persistence.autosave_assistant = false;
1284 agent.memory_state.persistence.autosave_min_length = 20;
1285
1286 agent
1287 .persist_message(Role::Assistant, "short assistant reply", &[], false)
1288 .await;
1289
1290 let history = agent
1291 .memory_state
1292 .persistence
1293 .memory
1294 .as_ref()
1295 .unwrap()
1296 .sqlite()
1297 .load_history(cid, 50)
1298 .await
1299 .unwrap();
1300 assert_eq!(history.len(), 1, "message must be saved");
1301 assert_eq!(history[0].content, "short assistant reply");
1302 assert_eq!(rx.borrow().embeddings_generated, 0);
1304 }
1305
1306 #[tokio::test]
1307 async fn persist_message_assistant_below_min_length_uses_save_only() {
1308 let provider = mock_provider(vec![]);
1309 let channel = MockChannel::new(vec![]);
1310 let registry = create_test_registry();
1311 let executor = MockToolExecutor::no_tools();
1312
1313 let (tx, rx) = tokio::sync::watch::channel(MetricsSnapshot::default());
1314 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1315 let cid = memory.sqlite().create_conversation().await.unwrap();
1316
1317 let mut agent = Agent::new(provider, channel, registry, None, 5, executor)
1319 .with_metrics(tx)
1320 .with_memory(std::sync::Arc::new(memory), cid, 50, 5, 100);
1321 agent.memory_state.persistence.autosave_assistant = true;
1322 agent.memory_state.persistence.autosave_min_length = 1000;
1323
1324 agent
1325 .persist_message(Role::Assistant, "too short", &[], false)
1326 .await;
1327
1328 let history = agent
1329 .memory_state
1330 .persistence
1331 .memory
1332 .as_ref()
1333 .unwrap()
1334 .sqlite()
1335 .load_history(cid, 50)
1336 .await
1337 .unwrap();
1338 assert_eq!(history.len(), 1, "message must be saved");
1339 assert_eq!(history[0].content, "too short");
1340 assert_eq!(rx.borrow().embeddings_generated, 0);
1341 }
1342
1343 #[tokio::test]
1344 async fn persist_message_assistant_at_min_length_boundary_uses_embed() {
1345 let provider = mock_provider(vec![]);
1347 let channel = MockChannel::new(vec![]);
1348 let registry = create_test_registry();
1349 let executor = MockToolExecutor::no_tools();
1350
1351 let (tx, rx) = tokio::sync::watch::channel(MetricsSnapshot::default());
1352 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1353 let cid = memory.sqlite().create_conversation().await.unwrap();
1354
1355 let min_length = 10usize;
1356 let mut agent = Agent::new(provider, channel, registry, None, 5, executor)
1357 .with_metrics(tx)
1358 .with_memory(std::sync::Arc::new(memory), cid, 50, 5, 100);
1359 agent.memory_state.persistence.autosave_assistant = true;
1360 agent.memory_state.persistence.autosave_min_length = min_length;
1361
1362 let content_at_boundary = "A".repeat(min_length);
1364 assert_eq!(content_at_boundary.len(), min_length);
1365 agent
1366 .persist_message(Role::Assistant, &content_at_boundary, &[], false)
1367 .await;
1368
1369 assert_eq!(rx.borrow().sqlite_message_count, 1);
1371 }
1372
1373 #[tokio::test]
1374 async fn persist_message_assistant_one_below_min_length_uses_save_only() {
1375 let provider = mock_provider(vec![]);
1377 let channel = MockChannel::new(vec![]);
1378 let registry = create_test_registry();
1379 let executor = MockToolExecutor::no_tools();
1380
1381 let (tx, rx) = tokio::sync::watch::channel(MetricsSnapshot::default());
1382 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1383 let cid = memory.sqlite().create_conversation().await.unwrap();
1384
1385 let min_length = 10usize;
1386 let mut agent = Agent::new(provider, channel, registry, None, 5, executor)
1387 .with_metrics(tx)
1388 .with_memory(std::sync::Arc::new(memory), cid, 50, 5, 100);
1389 agent.memory_state.persistence.autosave_assistant = true;
1390 agent.memory_state.persistence.autosave_min_length = min_length;
1391
1392 let content_below_boundary = "A".repeat(min_length - 1);
1394 assert_eq!(content_below_boundary.len(), min_length - 1);
1395 agent
1396 .persist_message(Role::Assistant, &content_below_boundary, &[], false)
1397 .await;
1398
1399 let history = agent
1400 .memory_state
1401 .persistence
1402 .memory
1403 .as_ref()
1404 .unwrap()
1405 .sqlite()
1406 .load_history(cid, 50)
1407 .await
1408 .unwrap();
1409 assert_eq!(history.len(), 1, "message must still be saved");
1410 assert_eq!(rx.borrow().embeddings_generated, 0);
1412 }
1413
1414 #[tokio::test]
1415 async fn persist_message_increments_unsummarized_count() {
1416 let provider = mock_provider(vec![]);
1417 let channel = MockChannel::new(vec![]);
1418 let registry = create_test_registry();
1419 let executor = MockToolExecutor::no_tools();
1420
1421 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1422 let cid = memory.sqlite().create_conversation().await.unwrap();
1423
1424 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
1426 std::sync::Arc::new(memory),
1427 cid,
1428 50,
1429 5,
1430 100,
1431 );
1432
1433 assert_eq!(agent.memory_state.persistence.unsummarized_count, 0);
1434
1435 agent.persist_message(Role::User, "first", &[], false).await;
1436 assert_eq!(agent.memory_state.persistence.unsummarized_count, 1);
1437
1438 agent
1439 .persist_message(Role::User, "second", &[], false)
1440 .await;
1441 assert_eq!(agent.memory_state.persistence.unsummarized_count, 2);
1442 }
1443
1444 #[tokio::test]
1445 async fn check_summarization_resets_counter_on_success() {
1446 let provider = mock_provider(vec![]);
1447 let channel = MockChannel::new(vec![]);
1448 let registry = create_test_registry();
1449 let executor = MockToolExecutor::no_tools();
1450
1451 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1452 let cid = memory.sqlite().create_conversation().await.unwrap();
1453
1454 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
1456 std::sync::Arc::new(memory),
1457 cid,
1458 50,
1459 5,
1460 1,
1461 );
1462
1463 agent.persist_message(Role::User, "msg1", &[], false).await;
1464 agent.persist_message(Role::User, "msg2", &[], false).await;
1465
1466 assert!(agent.memory_state.persistence.unsummarized_count <= 2);
1471 }
1472
1473 #[tokio::test]
1474 async fn unsummarized_count_not_incremented_without_memory() {
1475 let provider = mock_provider(vec![]);
1476 let channel = MockChannel::new(vec![]);
1477 let registry = create_test_registry();
1478 let executor = MockToolExecutor::no_tools();
1479 let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
1480
1481 agent.persist_message(Role::User, "hello", &[], false).await;
1482 assert_eq!(agent.memory_state.persistence.unsummarized_count, 0);
1484 }
1485
1486 mod graph_extraction_guards {
1488 use super::*;
1489 use crate::config::GraphConfig;
1490 use zeph_llm::provider::MessageMetadata;
1491 use zeph_memory::graph::GraphStore;
1492
1493 fn enabled_graph_config() -> GraphConfig {
1494 GraphConfig {
1495 enabled: true,
1496 ..GraphConfig::default()
1497 }
1498 }
1499
1500 async fn agent_with_graph(
1501 provider: &AnyProvider,
1502 config: GraphConfig,
1503 ) -> Agent<MockChannel> {
1504 let memory =
1505 test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1506 let cid = memory.sqlite().create_conversation().await.unwrap();
1507 Agent::new(
1508 provider.clone(),
1509 MockChannel::new(vec![]),
1510 create_test_registry(),
1511 None,
1512 5,
1513 MockToolExecutor::no_tools(),
1514 )
1515 .with_memory(std::sync::Arc::new(memory), cid, 50, 5, 100)
1516 .with_graph_config(config)
1517 }
1518
1519 #[tokio::test]
1520 async fn injection_flag_guard_skips_extraction() {
1521 let provider = mock_provider(vec![]);
1523 let mut agent = agent_with_graph(&provider, enabled_graph_config()).await;
1524 let pool = agent
1525 .memory_state
1526 .persistence
1527 .memory
1528 .as_ref()
1529 .unwrap()
1530 .sqlite()
1531 .pool()
1532 .clone();
1533
1534 agent
1535 .enqueue_graph_extraction_task("I use Rust", true, false)
1536 .await;
1537
1538 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1540
1541 let store = GraphStore::new(pool);
1542 let count = store.get_metadata("extraction_count").await.unwrap();
1543 assert!(
1544 count.is_none(),
1545 "injection flag must prevent extraction_count from being written"
1546 );
1547 }
1548
1549 #[tokio::test]
1550 async fn disabled_config_guard_skips_extraction() {
1551 let provider = mock_provider(vec![]);
1553 let disabled_cfg = GraphConfig {
1554 enabled: false,
1555 ..GraphConfig::default()
1556 };
1557 let mut agent = agent_with_graph(&provider, disabled_cfg).await;
1558 let pool = agent
1559 .memory_state
1560 .persistence
1561 .memory
1562 .as_ref()
1563 .unwrap()
1564 .sqlite()
1565 .pool()
1566 .clone();
1567
1568 agent
1569 .enqueue_graph_extraction_task("I use Rust", false, false)
1570 .await;
1571
1572 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1573
1574 let store = GraphStore::new(pool);
1575 let count = store.get_metadata("extraction_count").await.unwrap();
1576 assert!(
1577 count.is_none(),
1578 "disabled graph config must prevent extraction"
1579 );
1580 }
1581
1582 #[tokio::test]
1583 async fn happy_path_fires_extraction() {
1584 let provider = mock_provider(vec![]);
1587 let mut agent = agent_with_graph(&provider, enabled_graph_config()).await;
1588 let pool = agent
1589 .memory_state
1590 .persistence
1591 .memory
1592 .as_ref()
1593 .unwrap()
1594 .sqlite()
1595 .pool()
1596 .clone();
1597
1598 agent
1599 .enqueue_graph_extraction_task("I use Rust for systems programming", false, false)
1600 .await;
1601
1602 tokio::time::sleep(std::time::Duration::from_millis(200)).await;
1604
1605 let store = GraphStore::new(pool);
1606 let count = store.get_metadata("extraction_count").await.unwrap();
1607 assert!(
1608 count.is_some(),
1609 "happy-path extraction must increment extraction_count"
1610 );
1611 }
1612
1613 #[tokio::test]
1614 async fn tool_result_parts_guard_skips_extraction() {
1615 let provider = mock_provider(vec![]);
1619 let mut agent = agent_with_graph(&provider, enabled_graph_config()).await;
1620 let pool = agent
1621 .memory_state
1622 .persistence
1623 .memory
1624 .as_ref()
1625 .unwrap()
1626 .sqlite()
1627 .pool()
1628 .clone();
1629
1630 agent
1631 .enqueue_graph_extraction_task(
1632 "[tool_result: abc123]\nprovider_type = \"claude\"\nallowed_commands = []",
1633 false,
1634 true, )
1636 .await;
1637
1638 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1639
1640 let store = GraphStore::new(pool);
1641 let count = store.get_metadata("extraction_count").await.unwrap();
1642 assert!(
1643 count.is_none(),
1644 "tool result message must not trigger graph extraction"
1645 );
1646 }
1647
1648 #[tokio::test]
1649 async fn context_filter_excludes_tool_result_messages() {
1650 let provider = mock_provider(vec![]);
1661 let mut agent = agent_with_graph(&provider, enabled_graph_config()).await;
1662
1663 agent.msg.messages.push(Message {
1666 role: Role::User,
1667 content: "[tool_result: abc]\nprovider_type = \"openai\"".to_owned(),
1668 parts: vec![MessagePart::ToolResult {
1669 tool_use_id: "abc".to_owned(),
1670 content: "provider_type = \"openai\"".to_owned(),
1671 is_error: false,
1672 }],
1673 metadata: MessageMetadata::default(),
1674 });
1675
1676 let pool = agent
1677 .memory_state
1678 .persistence
1679 .memory
1680 .as_ref()
1681 .unwrap()
1682 .sqlite()
1683 .pool()
1684 .clone();
1685
1686 agent
1688 .enqueue_graph_extraction_task(
1689 "I prefer Rust for systems programming",
1690 false,
1691 false,
1692 )
1693 .await;
1694
1695 tokio::time::sleep(std::time::Duration::from_millis(200)).await;
1696
1697 let store = GraphStore::new(pool);
1699 let count = store.get_metadata("extraction_count").await.unwrap();
1700 assert!(
1701 count.is_some(),
1702 "conversational message must trigger extraction even with prior tool result in history"
1703 );
1704 }
1705 }
1706
1707 mod persona_extraction_guards {
1709 use super::*;
1710 use zeph_config::PersonaConfig;
1711 use zeph_llm::provider::MessageMetadata;
1712
1713 fn enabled_persona_config() -> PersonaConfig {
1714 PersonaConfig {
1715 enabled: true,
1716 min_messages: 1,
1717 ..PersonaConfig::default()
1718 }
1719 }
1720
1721 async fn agent_with_persona(
1722 provider: &AnyProvider,
1723 config: PersonaConfig,
1724 ) -> Agent<MockChannel> {
1725 let memory =
1726 test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1727 let cid = memory.sqlite().create_conversation().await.unwrap();
1728 let mut agent = Agent::new(
1729 provider.clone(),
1730 MockChannel::new(vec![]),
1731 create_test_registry(),
1732 None,
1733 5,
1734 MockToolExecutor::no_tools(),
1735 )
1736 .with_memory(std::sync::Arc::new(memory), cid, 50, 5, 100);
1737 agent.memory_state.extraction.persona_config = config;
1738 agent
1739 }
1740
1741 #[tokio::test]
1742 async fn disabled_config_skips_spawn() {
1743 let provider = mock_provider(vec![]);
1745 let mut agent = agent_with_persona(
1746 &provider,
1747 PersonaConfig {
1748 enabled: false,
1749 ..PersonaConfig::default()
1750 },
1751 )
1752 .await;
1753
1754 agent.msg.messages.push(zeph_llm::provider::Message {
1756 role: Role::User,
1757 content: "I prefer Rust for systems programming".to_owned(),
1758 parts: vec![],
1759 metadata: MessageMetadata::default(),
1760 });
1761
1762 agent.enqueue_persona_extraction_task();
1763
1764 let store = agent
1765 .memory_state
1766 .persistence
1767 .memory
1768 .as_ref()
1769 .unwrap()
1770 .sqlite()
1771 .clone();
1772 let count = store.count_persona_facts().await.unwrap();
1773 assert_eq!(count, 0, "disabled persona config must not write any facts");
1774 }
1775
1776 #[tokio::test]
1777 async fn below_min_messages_skips_spawn() {
1778 let provider = mock_provider(vec![]);
1780 let mut agent = agent_with_persona(
1781 &provider,
1782 PersonaConfig {
1783 enabled: true,
1784 min_messages: 3,
1785 ..PersonaConfig::default()
1786 },
1787 )
1788 .await;
1789
1790 for text in ["I use Rust", "I prefer async code"] {
1791 agent.msg.messages.push(zeph_llm::provider::Message {
1792 role: Role::User,
1793 content: text.to_owned(),
1794 parts: vec![],
1795 metadata: MessageMetadata::default(),
1796 });
1797 }
1798
1799 agent.enqueue_persona_extraction_task();
1800
1801 let store = agent
1802 .memory_state
1803 .persistence
1804 .memory
1805 .as_ref()
1806 .unwrap()
1807 .sqlite()
1808 .clone();
1809 let count = store.count_persona_facts().await.unwrap();
1810 assert_eq!(
1811 count, 0,
1812 "below min_messages threshold must not trigger extraction"
1813 );
1814 }
1815
1816 #[tokio::test]
1817 async fn no_memory_skips_spawn() {
1818 let provider = mock_provider(vec![]);
1820 let channel = MockChannel::new(vec![]);
1821 let registry = create_test_registry();
1822 let executor = MockToolExecutor::no_tools();
1823 let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
1824 agent.memory_state.extraction.persona_config = enabled_persona_config();
1825 agent.msg.messages.push(zeph_llm::provider::Message {
1826 role: Role::User,
1827 content: "I like Rust".to_owned(),
1828 parts: vec![],
1829 metadata: MessageMetadata::default(),
1830 });
1831
1832 agent.enqueue_persona_extraction_task();
1834 }
1835
1836 #[tokio::test]
1837 async fn enabled_enough_messages_spawns_extraction() {
1838 use zeph_llm::mock::MockProvider;
1841 let (mock, recorded) = MockProvider::default().with_recording();
1842 let provider = AnyProvider::Mock(mock);
1843 let mut agent = agent_with_persona(&provider, enabled_persona_config()).await;
1844
1845 agent.msg.messages.push(zeph_llm::provider::Message {
1846 role: Role::User,
1847 content: "I prefer Rust for systems programming".to_owned(),
1848 parts: vec![],
1849 metadata: MessageMetadata::default(),
1850 });
1851
1852 agent.enqueue_persona_extraction_task();
1853
1854 agent.lifecycle.supervisor.join_all_for_test().await;
1856
1857 let calls = recorded.lock().unwrap();
1858 assert!(
1859 !calls.is_empty(),
1860 "happy-path: provider.chat() must be called when extraction completes"
1861 );
1862 }
1863
1864 #[tokio::test]
1865 async fn messages_capped_at_eight() {
1866 use zeph_llm::mock::MockProvider;
1869 let (mock, recorded) = MockProvider::default().with_recording();
1870 let provider = AnyProvider::Mock(mock);
1871 let mut agent = agent_with_persona(&provider, enabled_persona_config()).await;
1872
1873 for i in 0..12u32 {
1874 agent.msg.messages.push(zeph_llm::provider::Message {
1875 role: Role::User,
1876 content: format!("I like message {i}"),
1877 parts: vec![],
1878 metadata: MessageMetadata::default(),
1879 });
1880 }
1881
1882 agent.enqueue_persona_extraction_task();
1883
1884 agent.lifecycle.supervisor.join_all_for_test().await;
1886
1887 let calls = recorded.lock().unwrap();
1889 assert!(
1890 !calls.is_empty(),
1891 "extraction must run when enough messages present"
1892 );
1893 let prompt = &calls[0];
1895 let user_text = prompt
1896 .iter()
1897 .filter(|m| m.role == Role::User)
1898 .map(|m| m.content.as_str())
1899 .collect::<Vec<_>>()
1900 .join(" ");
1901 assert!(
1903 !user_text.contains("I like message 8"),
1904 "message index 8 must be excluded from extraction input"
1905 );
1906 }
1907
1908 #[test]
1909 fn long_message_truncated_at_char_boundary() {
1910 let long_content = "x".repeat(3000);
1914 let truncated = if long_content.len() > 2048 {
1915 long_content[..long_content.floor_char_boundary(2048)].to_owned()
1916 } else {
1917 long_content.clone()
1918 };
1919 assert_eq!(
1920 truncated.len(),
1921 2048,
1922 "ASCII content must be truncated to exactly 2048 bytes"
1923 );
1924
1925 let multi = "é".repeat(1500); let truncated_multi = if multi.len() > 2048 {
1928 multi[..multi.floor_char_boundary(2048)].to_owned()
1929 } else {
1930 multi.clone()
1931 };
1932 assert!(
1933 truncated_multi.len() <= 2048,
1934 "multi-byte content must not exceed 2048 bytes"
1935 );
1936 assert!(truncated_multi.is_char_boundary(truncated_multi.len()));
1937 }
1938 }
1939
1940 #[tokio::test]
1941 async fn persist_message_user_always_embeds_regardless_of_autosave_flag() {
1942 let provider = mock_provider(vec![]);
1943 let channel = MockChannel::new(vec![]);
1944 let registry = create_test_registry();
1945 let executor = MockToolExecutor::no_tools();
1946
1947 let (tx, rx) = tokio::sync::watch::channel(MetricsSnapshot::default());
1948 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1949 let cid = memory.sqlite().create_conversation().await.unwrap();
1950
1951 let mut agent = Agent::new(provider, channel, registry, None, 5, executor)
1953 .with_metrics(tx)
1954 .with_memory(std::sync::Arc::new(memory), cid, 50, 5, 100);
1955 agent.memory_state.persistence.autosave_assistant = false;
1956 agent.memory_state.persistence.autosave_min_length = 20;
1957
1958 let long_user_msg = "A".repeat(100);
1959 agent
1960 .persist_message(Role::User, &long_user_msg, &[], false)
1961 .await;
1962
1963 let history = agent
1964 .memory_state
1965 .persistence
1966 .memory
1967 .as_ref()
1968 .unwrap()
1969 .sqlite()
1970 .load_history(cid, 50)
1971 .await
1972 .unwrap();
1973 assert_eq!(history.len(), 1, "user message must be saved");
1974 assert_eq!(rx.borrow().sqlite_message_count, 1);
1977 }
1978
1979 #[tokio::test]
1983 async fn persist_message_saves_correct_tool_use_parts() {
1984 use zeph_llm::provider::MessagePart;
1985
1986 let provider = mock_provider(vec![]);
1987 let channel = MockChannel::new(vec![]);
1988 let registry = create_test_registry();
1989 let executor = MockToolExecutor::no_tools();
1990
1991 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1992 let cid = memory.sqlite().create_conversation().await.unwrap();
1993
1994 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
1995 std::sync::Arc::new(memory),
1996 cid,
1997 50,
1998 5,
1999 100,
2000 );
2001
2002 let parts = vec![MessagePart::ToolUse {
2003 id: "call_abc123".to_string(),
2004 name: "read_file".to_string(),
2005 input: serde_json::json!({"path": "/tmp/test.txt"}),
2006 }];
2007 let content = "[tool_use: read_file(call_abc123)]";
2008
2009 agent
2010 .persist_message(Role::Assistant, content, &parts, false)
2011 .await;
2012
2013 let history = agent
2014 .memory_state
2015 .persistence
2016 .memory
2017 .as_ref()
2018 .unwrap()
2019 .sqlite()
2020 .load_history(cid, 50)
2021 .await
2022 .unwrap();
2023
2024 assert_eq!(history.len(), 1);
2025 assert_eq!(history[0].role, Role::Assistant);
2026 assert_eq!(history[0].content, content);
2027 assert_eq!(history[0].parts.len(), 1);
2028 match &history[0].parts[0] {
2029 MessagePart::ToolUse { id, name, .. } => {
2030 assert_eq!(id, "call_abc123");
2031 assert_eq!(name, "read_file");
2032 }
2033 other => panic!("expected ToolUse part, got {other:?}"),
2034 }
2035 assert!(
2037 !history[0]
2038 .parts
2039 .iter()
2040 .any(|p| matches!(p, MessagePart::ToolResult { .. })),
2041 "assistant message must not contain ToolResult parts"
2042 );
2043 }
2044
2045 #[tokio::test]
2046 async fn persist_message_saves_correct_tool_result_parts() {
2047 use zeph_llm::provider::MessagePart;
2048
2049 let provider = mock_provider(vec![]);
2050 let channel = MockChannel::new(vec![]);
2051 let registry = create_test_registry();
2052 let executor = MockToolExecutor::no_tools();
2053
2054 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2055 let cid = memory.sqlite().create_conversation().await.unwrap();
2056
2057 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2058 std::sync::Arc::new(memory),
2059 cid,
2060 50,
2061 5,
2062 100,
2063 );
2064
2065 let parts = vec![MessagePart::ToolResult {
2066 tool_use_id: "call_abc123".to_string(),
2067 content: "file contents here".to_string(),
2068 is_error: false,
2069 }];
2070 let content = "[tool_result: call_abc123]\nfile contents here";
2071
2072 agent
2073 .persist_message(Role::User, content, &parts, false)
2074 .await;
2075
2076 let history = agent
2077 .memory_state
2078 .persistence
2079 .memory
2080 .as_ref()
2081 .unwrap()
2082 .sqlite()
2083 .load_history(cid, 50)
2084 .await
2085 .unwrap();
2086
2087 assert_eq!(history.len(), 1);
2088 assert_eq!(history[0].role, Role::User);
2089 assert_eq!(history[0].content, content);
2090 assert_eq!(history[0].parts.len(), 1);
2091 match &history[0].parts[0] {
2092 MessagePart::ToolResult {
2093 tool_use_id,
2094 content: result_content,
2095 is_error,
2096 } => {
2097 assert_eq!(tool_use_id, "call_abc123");
2098 assert_eq!(result_content, "file contents here");
2099 assert!(!is_error);
2100 }
2101 other => panic!("expected ToolResult part, got {other:?}"),
2102 }
2103 assert!(
2105 !history[0]
2106 .parts
2107 .iter()
2108 .any(|p| matches!(p, MessagePart::ToolUse { .. })),
2109 "user ToolResult message must not contain ToolUse parts"
2110 );
2111 }
2112
2113 #[tokio::test]
2114 async fn persist_message_roundtrip_preserves_role_part_alignment() {
2115 use zeph_llm::provider::MessagePart;
2116
2117 let provider = mock_provider(vec![]);
2118 let channel = MockChannel::new(vec![]);
2119 let registry = create_test_registry();
2120 let executor = MockToolExecutor::no_tools();
2121
2122 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2123 let cid = memory.sqlite().create_conversation().await.unwrap();
2124
2125 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2126 std::sync::Arc::new(memory),
2127 cid,
2128 50,
2129 5,
2130 100,
2131 );
2132
2133 let assistant_parts = vec![MessagePart::ToolUse {
2135 id: "id_1".to_string(),
2136 name: "list_dir".to_string(),
2137 input: serde_json::json!({"path": "/tmp"}),
2138 }];
2139 agent
2140 .persist_message(
2141 Role::Assistant,
2142 "[tool_use: list_dir(id_1)]",
2143 &assistant_parts,
2144 false,
2145 )
2146 .await;
2147
2148 let user_parts = vec![MessagePart::ToolResult {
2150 tool_use_id: "id_1".to_string(),
2151 content: "file1.txt\nfile2.txt".to_string(),
2152 is_error: false,
2153 }];
2154 agent
2155 .persist_message(
2156 Role::User,
2157 "[tool_result: id_1]\nfile1.txt\nfile2.txt",
2158 &user_parts,
2159 false,
2160 )
2161 .await;
2162
2163 let history = agent
2164 .memory_state
2165 .persistence
2166 .memory
2167 .as_ref()
2168 .unwrap()
2169 .sqlite()
2170 .load_history(cid, 50)
2171 .await
2172 .unwrap();
2173
2174 assert_eq!(history.len(), 2);
2175
2176 assert_eq!(history[0].role, Role::Assistant);
2178 assert_eq!(history[0].content, "[tool_use: list_dir(id_1)]");
2179 assert!(
2180 matches!(&history[0].parts[0], MessagePart::ToolUse { id, .. } if id == "id_1"),
2181 "first message must be assistant ToolUse"
2182 );
2183
2184 assert_eq!(history[1].role, Role::User);
2186 assert_eq!(
2187 history[1].content,
2188 "[tool_result: id_1]\nfile1.txt\nfile2.txt"
2189 );
2190 assert!(
2191 matches!(&history[1].parts[0], MessagePart::ToolResult { tool_use_id, .. } if tool_use_id == "id_1"),
2192 "second message must be user ToolResult"
2193 );
2194
2195 assert!(
2197 !history[0]
2198 .parts
2199 .iter()
2200 .any(|p| matches!(p, MessagePart::ToolResult { .. })),
2201 "assistant message must not have ToolResult parts"
2202 );
2203 assert!(
2204 !history[1]
2205 .parts
2206 .iter()
2207 .any(|p| matches!(p, MessagePart::ToolUse { .. })),
2208 "user message must not have ToolUse parts"
2209 );
2210 }
2211
2212 #[tokio::test]
2213 async fn persist_message_saves_correct_tool_output_parts() {
2214 use zeph_llm::provider::MessagePart;
2215
2216 let provider = mock_provider(vec![]);
2217 let channel = MockChannel::new(vec![]);
2218 let registry = create_test_registry();
2219 let executor = MockToolExecutor::no_tools();
2220
2221 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2222 let cid = memory.sqlite().create_conversation().await.unwrap();
2223
2224 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2225 std::sync::Arc::new(memory),
2226 cid,
2227 50,
2228 5,
2229 100,
2230 );
2231
2232 let parts = vec![MessagePart::ToolOutput {
2233 tool_name: "shell".into(),
2234 body: "hello from shell".to_string(),
2235 compacted_at: None,
2236 }];
2237 let content = "[tool: shell]\nhello from shell";
2238
2239 agent
2240 .persist_message(Role::User, content, &parts, false)
2241 .await;
2242
2243 let history = agent
2244 .memory_state
2245 .persistence
2246 .memory
2247 .as_ref()
2248 .unwrap()
2249 .sqlite()
2250 .load_history(cid, 50)
2251 .await
2252 .unwrap();
2253
2254 assert_eq!(history.len(), 1);
2255 assert_eq!(history[0].role, Role::User);
2256 assert_eq!(history[0].content, content);
2257 assert_eq!(history[0].parts.len(), 1);
2258 match &history[0].parts[0] {
2259 MessagePart::ToolOutput {
2260 tool_name,
2261 body,
2262 compacted_at,
2263 } => {
2264 assert_eq!(tool_name, "shell");
2265 assert_eq!(body, "hello from shell");
2266 assert!(compacted_at.is_none());
2267 }
2268 other => panic!("expected ToolOutput part, got {other:?}"),
2269 }
2270 }
2271
2272 #[tokio::test]
2275 async fn load_history_removes_trailing_orphan_tool_use() {
2276 use zeph_llm::provider::MessagePart;
2277
2278 let provider = mock_provider(vec![]);
2279 let channel = MockChannel::new(vec![]);
2280 let registry = create_test_registry();
2281 let executor = MockToolExecutor::no_tools();
2282
2283 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2284 let cid = memory.sqlite().create_conversation().await.unwrap();
2285 let sqlite = memory.sqlite();
2286
2287 sqlite
2289 .save_message(cid, "user", "do something with a tool")
2290 .await
2291 .unwrap();
2292
2293 let parts = serde_json::to_string(&[MessagePart::ToolUse {
2295 id: "call_orphan".to_string(),
2296 name: "shell".to_string(),
2297 input: serde_json::json!({"command": "ls"}),
2298 }])
2299 .unwrap();
2300 sqlite
2301 .save_message_with_parts(cid, "assistant", "[tool_use: shell(call_orphan)]", &parts)
2302 .await
2303 .unwrap();
2304
2305 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2306 std::sync::Arc::new(memory),
2307 cid,
2308 50,
2309 5,
2310 100,
2311 );
2312
2313 let messages_before = agent.msg.messages.len();
2314 agent.load_history().await.unwrap();
2315
2316 assert_eq!(
2318 agent.msg.messages.len(),
2319 messages_before + 1,
2320 "orphaned trailing tool_use must be removed"
2321 );
2322 assert_eq!(agent.msg.messages.last().unwrap().role, Role::User);
2323 }
2324
2325 #[tokio::test]
2326 async fn load_history_removes_leading_orphan_tool_result() {
2327 use zeph_llm::provider::MessagePart;
2328
2329 let provider = mock_provider(vec![]);
2330 let channel = MockChannel::new(vec![]);
2331 let registry = create_test_registry();
2332 let executor = MockToolExecutor::no_tools();
2333
2334 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2335 let cid = memory.sqlite().create_conversation().await.unwrap();
2336 let sqlite = memory.sqlite();
2337
2338 let result_parts = serde_json::to_string(&[MessagePart::ToolResult {
2340 tool_use_id: "call_missing".to_string(),
2341 content: "result data".to_string(),
2342 is_error: false,
2343 }])
2344 .unwrap();
2345 sqlite
2346 .save_message_with_parts(
2347 cid,
2348 "user",
2349 "[tool_result: call_missing]\nresult data",
2350 &result_parts,
2351 )
2352 .await
2353 .unwrap();
2354
2355 sqlite
2357 .save_message(cid, "assistant", "here is my response")
2358 .await
2359 .unwrap();
2360
2361 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2362 std::sync::Arc::new(memory),
2363 cid,
2364 50,
2365 5,
2366 100,
2367 );
2368
2369 let messages_before = agent.msg.messages.len();
2370 agent.load_history().await.unwrap();
2371
2372 assert_eq!(
2374 agent.msg.messages.len(),
2375 messages_before + 1,
2376 "orphaned leading tool_result must be removed"
2377 );
2378 assert_eq!(agent.msg.messages.last().unwrap().role, Role::Assistant);
2379 }
2380
2381 #[tokio::test]
2382 async fn load_history_preserves_complete_tool_pairs() {
2383 use zeph_llm::provider::MessagePart;
2384
2385 let provider = mock_provider(vec![]);
2386 let channel = MockChannel::new(vec![]);
2387 let registry = create_test_registry();
2388 let executor = MockToolExecutor::no_tools();
2389
2390 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2391 let cid = memory.sqlite().create_conversation().await.unwrap();
2392 let sqlite = memory.sqlite();
2393
2394 let use_parts = serde_json::to_string(&[MessagePart::ToolUse {
2396 id: "call_ok".to_string(),
2397 name: "shell".to_string(),
2398 input: serde_json::json!({"command": "pwd"}),
2399 }])
2400 .unwrap();
2401 sqlite
2402 .save_message_with_parts(cid, "assistant", "[tool_use: shell(call_ok)]", &use_parts)
2403 .await
2404 .unwrap();
2405
2406 let result_parts = serde_json::to_string(&[MessagePart::ToolResult {
2407 tool_use_id: "call_ok".to_string(),
2408 content: "/home/user".to_string(),
2409 is_error: false,
2410 }])
2411 .unwrap();
2412 sqlite
2413 .save_message_with_parts(
2414 cid,
2415 "user",
2416 "[tool_result: call_ok]\n/home/user",
2417 &result_parts,
2418 )
2419 .await
2420 .unwrap();
2421
2422 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2423 std::sync::Arc::new(memory),
2424 cid,
2425 50,
2426 5,
2427 100,
2428 );
2429
2430 let messages_before = agent.msg.messages.len();
2431 agent.load_history().await.unwrap();
2432
2433 assert_eq!(
2435 agent.msg.messages.len(),
2436 messages_before + 2,
2437 "complete tool_use/tool_result pair must be preserved"
2438 );
2439 assert_eq!(agent.msg.messages[messages_before].role, Role::Assistant);
2440 assert_eq!(agent.msg.messages[messages_before + 1].role, Role::User);
2441 }
2442
2443 #[tokio::test]
2444 async fn load_history_handles_multiple_trailing_orphans() {
2445 use zeph_llm::provider::MessagePart;
2446
2447 let provider = mock_provider(vec![]);
2448 let channel = MockChannel::new(vec![]);
2449 let registry = create_test_registry();
2450 let executor = MockToolExecutor::no_tools();
2451
2452 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2453 let cid = memory.sqlite().create_conversation().await.unwrap();
2454 let sqlite = memory.sqlite();
2455
2456 sqlite.save_message(cid, "user", "start").await.unwrap();
2458
2459 let parts1 = serde_json::to_string(&[MessagePart::ToolUse {
2461 id: "call_1".to_string(),
2462 name: "shell".to_string(),
2463 input: serde_json::json!({}),
2464 }])
2465 .unwrap();
2466 sqlite
2467 .save_message_with_parts(cid, "assistant", "[tool_use: shell(call_1)]", &parts1)
2468 .await
2469 .unwrap();
2470
2471 let parts2 = serde_json::to_string(&[MessagePart::ToolUse {
2473 id: "call_2".to_string(),
2474 name: "read_file".to_string(),
2475 input: serde_json::json!({}),
2476 }])
2477 .unwrap();
2478 sqlite
2479 .save_message_with_parts(cid, "assistant", "[tool_use: read_file(call_2)]", &parts2)
2480 .await
2481 .unwrap();
2482
2483 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2484 std::sync::Arc::new(memory),
2485 cid,
2486 50,
2487 5,
2488 100,
2489 );
2490
2491 let messages_before = agent.msg.messages.len();
2492 agent.load_history().await.unwrap();
2493
2494 assert_eq!(
2496 agent.msg.messages.len(),
2497 messages_before + 1,
2498 "all trailing orphaned tool_use messages must be removed"
2499 );
2500 assert_eq!(agent.msg.messages.last().unwrap().role, Role::User);
2501 }
2502
2503 #[tokio::test]
2504 async fn load_history_no_tool_messages_unchanged() {
2505 let provider = mock_provider(vec![]);
2506 let channel = MockChannel::new(vec![]);
2507 let registry = create_test_registry();
2508 let executor = MockToolExecutor::no_tools();
2509
2510 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2511 let cid = memory.sqlite().create_conversation().await.unwrap();
2512 let sqlite = memory.sqlite();
2513
2514 sqlite.save_message(cid, "user", "hello").await.unwrap();
2515 sqlite
2516 .save_message(cid, "assistant", "hi there")
2517 .await
2518 .unwrap();
2519 sqlite
2520 .save_message(cid, "user", "how are you?")
2521 .await
2522 .unwrap();
2523
2524 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2525 std::sync::Arc::new(memory),
2526 cid,
2527 50,
2528 5,
2529 100,
2530 );
2531
2532 let messages_before = agent.msg.messages.len();
2533 agent.load_history().await.unwrap();
2534
2535 assert_eq!(
2537 agent.msg.messages.len(),
2538 messages_before + 3,
2539 "plain messages without tool parts must pass through unchanged"
2540 );
2541 }
2542
2543 #[tokio::test]
2544 async fn load_history_removes_both_leading_and_trailing_orphans() {
2545 use zeph_llm::provider::MessagePart;
2546
2547 let provider = mock_provider(vec![]);
2548 let channel = MockChannel::new(vec![]);
2549 let registry = create_test_registry();
2550 let executor = MockToolExecutor::no_tools();
2551
2552 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2553 let cid = memory.sqlite().create_conversation().await.unwrap();
2554 let sqlite = memory.sqlite();
2555
2556 let result_parts = serde_json::to_string(&[MessagePart::ToolResult {
2558 tool_use_id: "call_leading".to_string(),
2559 content: "orphaned result".to_string(),
2560 is_error: false,
2561 }])
2562 .unwrap();
2563 sqlite
2564 .save_message_with_parts(
2565 cid,
2566 "user",
2567 "[tool_result: call_leading]\norphaned result",
2568 &result_parts,
2569 )
2570 .await
2571 .unwrap();
2572
2573 sqlite
2575 .save_message(cid, "user", "what is 2+2?")
2576 .await
2577 .unwrap();
2578 sqlite.save_message(cid, "assistant", "4").await.unwrap();
2579
2580 let use_parts = serde_json::to_string(&[MessagePart::ToolUse {
2582 id: "call_trailing".to_string(),
2583 name: "shell".to_string(),
2584 input: serde_json::json!({"command": "date"}),
2585 }])
2586 .unwrap();
2587 sqlite
2588 .save_message_with_parts(
2589 cid,
2590 "assistant",
2591 "[tool_use: shell(call_trailing)]",
2592 &use_parts,
2593 )
2594 .await
2595 .unwrap();
2596
2597 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2598 std::sync::Arc::new(memory),
2599 cid,
2600 50,
2601 5,
2602 100,
2603 );
2604
2605 let messages_before = agent.msg.messages.len();
2606 agent.load_history().await.unwrap();
2607
2608 assert_eq!(
2610 agent.msg.messages.len(),
2611 messages_before + 2,
2612 "both leading and trailing orphans must be removed"
2613 );
2614 assert_eq!(agent.msg.messages[messages_before].role, Role::User);
2615 assert_eq!(agent.msg.messages[messages_before].content, "what is 2+2?");
2616 assert_eq!(
2617 agent.msg.messages[messages_before + 1].role,
2618 Role::Assistant
2619 );
2620 assert_eq!(agent.msg.messages[messages_before + 1].content, "4");
2621 }
2622
2623 #[tokio::test]
2628 async fn sanitize_tool_pairs_strips_mid_history_orphan_tool_use() {
2629 use zeph_llm::provider::MessagePart;
2630
2631 let provider = mock_provider(vec![]);
2632 let channel = MockChannel::new(vec![]);
2633 let registry = create_test_registry();
2634 let executor = MockToolExecutor::no_tools();
2635
2636 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2637 let cid = memory.sqlite().create_conversation().await.unwrap();
2638 let sqlite = memory.sqlite();
2639
2640 sqlite
2642 .save_message(cid, "user", "first question")
2643 .await
2644 .unwrap();
2645 sqlite
2646 .save_message(cid, "assistant", "first answer")
2647 .await
2648 .unwrap();
2649
2650 let use_parts = serde_json::to_string(&[
2654 MessagePart::ToolUse {
2655 id: "call_mid_1".to_string(),
2656 name: "shell".to_string(),
2657 input: serde_json::json!({"command": "ls"}),
2658 },
2659 MessagePart::Text {
2660 text: "Let me check the files.".to_string(),
2661 },
2662 ])
2663 .unwrap();
2664 sqlite
2665 .save_message_with_parts(cid, "assistant", "Let me check the files.", &use_parts)
2666 .await
2667 .unwrap();
2668
2669 sqlite
2671 .save_message(cid, "user", "second question")
2672 .await
2673 .unwrap();
2674 sqlite
2675 .save_message(cid, "assistant", "second answer")
2676 .await
2677 .unwrap();
2678
2679 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2680 std::sync::Arc::new(memory),
2681 cid,
2682 50,
2683 5,
2684 100,
2685 );
2686
2687 let messages_before = agent.msg.messages.len();
2688 agent.load_history().await.unwrap();
2689
2690 assert_eq!(
2693 agent.msg.messages.len(),
2694 messages_before + 5,
2695 "message count must be 5 (orphan message kept — has text content)"
2696 );
2697
2698 let orphan = &agent.msg.messages[messages_before + 2];
2700 assert_eq!(orphan.role, Role::Assistant);
2701 assert!(
2702 !orphan
2703 .parts
2704 .iter()
2705 .any(|p| matches!(p, MessagePart::ToolUse { .. })),
2706 "orphaned ToolUse parts must be stripped from mid-history message"
2707 );
2708 assert!(
2710 orphan.parts.iter().any(
2711 |p| matches!(p, MessagePart::Text { text } if text == "Let me check the files.")
2712 ),
2713 "text content of orphaned assistant message must be preserved"
2714 );
2715 }
2716
2717 #[tokio::test]
2722 async fn load_history_keeps_tool_only_user_message() {
2723 use zeph_llm::provider::MessagePart;
2724
2725 let provider = mock_provider(vec![]);
2726 let channel = MockChannel::new(vec![]);
2727 let registry = create_test_registry();
2728 let executor = MockToolExecutor::no_tools();
2729
2730 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2731 let cid = memory.sqlite().create_conversation().await.unwrap();
2732 let sqlite = memory.sqlite();
2733
2734 let use_parts = serde_json::to_string(&[MessagePart::ToolUse {
2736 id: "call_rc3".to_string(),
2737 name: "memory_save".to_string(),
2738 input: serde_json::json!({"content": "something"}),
2739 }])
2740 .unwrap();
2741 sqlite
2742 .save_message_with_parts(cid, "assistant", "[tool_use: memory_save]", &use_parts)
2743 .await
2744 .unwrap();
2745
2746 let result_parts = serde_json::to_string(&[MessagePart::ToolResult {
2748 tool_use_id: "call_rc3".to_string(),
2749 content: "saved".to_string(),
2750 is_error: false,
2751 }])
2752 .unwrap();
2753 sqlite
2754 .save_message_with_parts(cid, "user", "", &result_parts)
2755 .await
2756 .unwrap();
2757
2758 sqlite.save_message(cid, "assistant", "done").await.unwrap();
2759
2760 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2761 std::sync::Arc::new(memory),
2762 cid,
2763 50,
2764 5,
2765 100,
2766 );
2767
2768 let messages_before = agent.msg.messages.len();
2769 agent.load_history().await.unwrap();
2770
2771 assert_eq!(
2774 agent.msg.messages.len(),
2775 messages_before + 3,
2776 "user message with empty content but ToolResult parts must not be dropped"
2777 );
2778
2779 let user_msg = &agent.msg.messages[messages_before + 1];
2781 assert_eq!(user_msg.role, Role::User);
2782 assert!(
2783 user_msg.parts.iter().any(
2784 |p| matches!(p, MessagePart::ToolResult { tool_use_id, .. } if tool_use_id == "call_rc3")
2785 ),
2786 "ToolResult part must be preserved on user message with empty content"
2787 );
2788 }
2789
2790 #[tokio::test]
2794 async fn strip_orphans_removes_orphaned_tool_result() {
2795 use zeph_llm::provider::MessagePart;
2796
2797 let provider = mock_provider(vec![]);
2798 let channel = MockChannel::new(vec![]);
2799 let registry = create_test_registry();
2800 let executor = MockToolExecutor::no_tools();
2801
2802 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2803 let cid = memory.sqlite().create_conversation().await.unwrap();
2804 let sqlite = memory.sqlite();
2805
2806 sqlite.save_message(cid, "user", "hello").await.unwrap();
2808 sqlite.save_message(cid, "assistant", "hi").await.unwrap();
2809
2810 sqlite
2812 .save_message(cid, "assistant", "plain answer")
2813 .await
2814 .unwrap();
2815
2816 let orphan_result_parts = serde_json::to_string(&[MessagePart::ToolResult {
2818 tool_use_id: "call_nonexistent".to_string(),
2819 content: "stale result".to_string(),
2820 is_error: false,
2821 }])
2822 .unwrap();
2823 sqlite
2824 .save_message_with_parts(
2825 cid,
2826 "user",
2827 "[tool_result: call_nonexistent]\nstale result",
2828 &orphan_result_parts,
2829 )
2830 .await
2831 .unwrap();
2832
2833 sqlite
2834 .save_message(cid, "assistant", "final")
2835 .await
2836 .unwrap();
2837
2838 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2839 std::sync::Arc::new(memory),
2840 cid,
2841 50,
2842 5,
2843 100,
2844 );
2845
2846 let messages_before = agent.msg.messages.len();
2847 agent.load_history().await.unwrap();
2848
2849 let loaded = &agent.msg.messages[messages_before..];
2853 for msg in loaded {
2854 assert!(
2855 !msg.parts.iter().any(|p| matches!(
2856 p,
2857 MessagePart::ToolResult { tool_use_id, .. } if tool_use_id == "call_nonexistent"
2858 )),
2859 "orphaned ToolResult part must be stripped from history"
2860 );
2861 }
2862 }
2863
2864 #[tokio::test]
2867 async fn strip_orphans_keeps_complete_pair() {
2868 use zeph_llm::provider::MessagePart;
2869
2870 let provider = mock_provider(vec![]);
2871 let channel = MockChannel::new(vec![]);
2872 let registry = create_test_registry();
2873 let executor = MockToolExecutor::no_tools();
2874
2875 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2876 let cid = memory.sqlite().create_conversation().await.unwrap();
2877 let sqlite = memory.sqlite();
2878
2879 let use_parts = serde_json::to_string(&[MessagePart::ToolUse {
2880 id: "call_valid".to_string(),
2881 name: "shell".to_string(),
2882 input: serde_json::json!({"command": "ls"}),
2883 }])
2884 .unwrap();
2885 sqlite
2886 .save_message_with_parts(cid, "assistant", "[tool_use: shell]", &use_parts)
2887 .await
2888 .unwrap();
2889
2890 let result_parts = serde_json::to_string(&[MessagePart::ToolResult {
2891 tool_use_id: "call_valid".to_string(),
2892 content: "file.rs".to_string(),
2893 is_error: false,
2894 }])
2895 .unwrap();
2896 sqlite
2897 .save_message_with_parts(cid, "user", "", &result_parts)
2898 .await
2899 .unwrap();
2900
2901 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2902 std::sync::Arc::new(memory),
2903 cid,
2904 50,
2905 5,
2906 100,
2907 );
2908
2909 let messages_before = agent.msg.messages.len();
2910 agent.load_history().await.unwrap();
2911
2912 assert_eq!(
2913 agent.msg.messages.len(),
2914 messages_before + 2,
2915 "complete tool_use/tool_result pair must be preserved"
2916 );
2917
2918 let user_msg = &agent.msg.messages[messages_before + 1];
2919 assert!(
2920 user_msg.parts.iter().any(|p| matches!(
2921 p,
2922 MessagePart::ToolResult { tool_use_id, .. } if tool_use_id == "call_valid"
2923 )),
2924 "ToolResult part for a matched tool_use must not be stripped"
2925 );
2926 }
2927
2928 #[tokio::test]
2931 async fn strip_orphans_mixed_history() {
2932 use zeph_llm::provider::MessagePart;
2933
2934 let provider = mock_provider(vec![]);
2935 let channel = MockChannel::new(vec![]);
2936 let registry = create_test_registry();
2937 let executor = MockToolExecutor::no_tools();
2938
2939 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2940 let cid = memory.sqlite().create_conversation().await.unwrap();
2941 let sqlite = memory.sqlite();
2942
2943 let use_parts_ok = serde_json::to_string(&[MessagePart::ToolUse {
2945 id: "call_good".to_string(),
2946 name: "shell".to_string(),
2947 input: serde_json::json!({"command": "pwd"}),
2948 }])
2949 .unwrap();
2950 sqlite
2951 .save_message_with_parts(cid, "assistant", "[tool_use: shell]", &use_parts_ok)
2952 .await
2953 .unwrap();
2954
2955 let result_parts_ok = serde_json::to_string(&[MessagePart::ToolResult {
2956 tool_use_id: "call_good".to_string(),
2957 content: "/home".to_string(),
2958 is_error: false,
2959 }])
2960 .unwrap();
2961 sqlite
2962 .save_message_with_parts(cid, "user", "", &result_parts_ok)
2963 .await
2964 .unwrap();
2965
2966 sqlite
2968 .save_message(cid, "assistant", "text only")
2969 .await
2970 .unwrap();
2971
2972 let orphan_parts = serde_json::to_string(&[MessagePart::ToolResult {
2973 tool_use_id: "call_ghost".to_string(),
2974 content: "ghost result".to_string(),
2975 is_error: false,
2976 }])
2977 .unwrap();
2978 sqlite
2979 .save_message_with_parts(
2980 cid,
2981 "user",
2982 "[tool_result: call_ghost]\nghost result",
2983 &orphan_parts,
2984 )
2985 .await
2986 .unwrap();
2987
2988 sqlite
2989 .save_message(cid, "assistant", "final reply")
2990 .await
2991 .unwrap();
2992
2993 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2994 std::sync::Arc::new(memory),
2995 cid,
2996 50,
2997 5,
2998 100,
2999 );
3000
3001 let messages_before = agent.msg.messages.len();
3002 agent.load_history().await.unwrap();
3003
3004 let loaded = &agent.msg.messages[messages_before..];
3005
3006 for msg in loaded {
3008 assert!(
3009 !msg.parts.iter().any(|p| matches!(
3010 p,
3011 MessagePart::ToolResult { tool_use_id, .. } if tool_use_id == "call_ghost"
3012 )),
3013 "orphaned ToolResult (call_ghost) must be stripped from history"
3014 );
3015 }
3016
3017 let has_good_result = loaded.iter().any(|msg| {
3020 msg.role == Role::User
3021 && msg.parts.iter().any(|p| {
3022 matches!(
3023 p,
3024 MessagePart::ToolResult { tool_use_id, .. } if tool_use_id == "call_good"
3025 )
3026 })
3027 });
3028 assert!(
3029 has_good_result,
3030 "matched ToolResult (call_good) must be preserved in history"
3031 );
3032 }
3033
3034 #[tokio::test]
3037 async fn sanitize_tool_pairs_preserves_matched_tool_pair() {
3038 use zeph_llm::provider::MessagePart;
3039
3040 let provider = mock_provider(vec![]);
3041 let channel = MockChannel::new(vec![]);
3042 let registry = create_test_registry();
3043 let executor = MockToolExecutor::no_tools();
3044
3045 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
3046 let cid = memory.sqlite().create_conversation().await.unwrap();
3047 let sqlite = memory.sqlite();
3048
3049 sqlite
3050 .save_message(cid, "user", "run a command")
3051 .await
3052 .unwrap();
3053
3054 let use_parts = serde_json::to_string(&[MessagePart::ToolUse {
3056 id: "call_ok".to_string(),
3057 name: "shell".to_string(),
3058 input: serde_json::json!({"command": "echo hi"}),
3059 }])
3060 .unwrap();
3061 sqlite
3062 .save_message_with_parts(cid, "assistant", "[tool_use: shell]", &use_parts)
3063 .await
3064 .unwrap();
3065
3066 let result_parts = serde_json::to_string(&[MessagePart::ToolResult {
3068 tool_use_id: "call_ok".to_string(),
3069 content: "hi".to_string(),
3070 is_error: false,
3071 }])
3072 .unwrap();
3073 sqlite
3074 .save_message_with_parts(cid, "user", "[tool_result: call_ok]\nhi", &result_parts)
3075 .await
3076 .unwrap();
3077
3078 sqlite.save_message(cid, "assistant", "done").await.unwrap();
3079
3080 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
3081 std::sync::Arc::new(memory),
3082 cid,
3083 50,
3084 5,
3085 100,
3086 );
3087
3088 let messages_before = agent.msg.messages.len();
3089 agent.load_history().await.unwrap();
3090
3091 assert_eq!(
3093 agent.msg.messages.len(),
3094 messages_before + 4,
3095 "matched tool pair must not be removed"
3096 );
3097 let tool_msg = &agent.msg.messages[messages_before + 1];
3098 assert!(
3099 tool_msg
3100 .parts
3101 .iter()
3102 .any(|p| matches!(p, MessagePart::ToolUse { id, .. } if id == "call_ok")),
3103 "matched ToolUse parts must be preserved"
3104 );
3105 }
3106
3107 #[tokio::test]
3111 async fn persist_cancelled_tool_results_pairs_tool_use() {
3112 use zeph_llm::provider::MessagePart;
3113
3114 let provider = mock_provider(vec![]);
3115 let channel = MockChannel::new(vec![]);
3116 let registry = create_test_registry();
3117 let executor = MockToolExecutor::no_tools();
3118
3119 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
3120 let cid = memory.sqlite().create_conversation().await.unwrap();
3121
3122 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
3123 std::sync::Arc::new(memory),
3124 cid,
3125 50,
3126 5,
3127 100,
3128 );
3129
3130 let tool_calls = vec![
3132 zeph_llm::provider::ToolUseRequest {
3133 id: "cancel_id_1".to_string(),
3134 name: "shell".to_string().into(),
3135 input: serde_json::json!({}),
3136 },
3137 zeph_llm::provider::ToolUseRequest {
3138 id: "cancel_id_2".to_string(),
3139 name: "read_file".to_string().into(),
3140 input: serde_json::json!({}),
3141 },
3142 ];
3143
3144 agent.persist_cancelled_tool_results(&tool_calls).await;
3145
3146 let history = agent
3147 .memory_state
3148 .persistence
3149 .memory
3150 .as_ref()
3151 .unwrap()
3152 .sqlite()
3153 .load_history(cid, 50)
3154 .await
3155 .unwrap();
3156
3157 assert_eq!(history.len(), 1);
3159 assert_eq!(history[0].role, Role::User);
3160
3161 for tc in &tool_calls {
3163 assert!(
3164 history[0].parts.iter().any(|p| matches!(
3165 p,
3166 MessagePart::ToolResult { tool_use_id, is_error, .. }
3167 if tool_use_id == &tc.id && *is_error
3168 )),
3169 "tombstone ToolResult for {} must be present and is_error=true",
3170 tc.id
3171 );
3172 }
3173 }
3174
3175 #[test]
3178 fn meaningful_content_empty_string() {
3179 assert!(!has_meaningful_content(""));
3180 }
3181
3182 #[test]
3183 fn meaningful_content_whitespace_only() {
3184 assert!(!has_meaningful_content(" \n\t "));
3185 }
3186
3187 #[test]
3188 fn meaningful_content_tool_use_only() {
3189 assert!(!has_meaningful_content("[tool_use: shell(call_1)]"));
3190 }
3191
3192 #[test]
3193 fn meaningful_content_tool_use_no_parens() {
3194 assert!(!has_meaningful_content("[tool_use: memory_save]"));
3196 }
3197
3198 #[test]
3199 fn meaningful_content_tool_result_with_body() {
3200 assert!(!has_meaningful_content(
3201 "[tool_result: call_1]\nsome output here"
3202 ));
3203 }
3204
3205 #[test]
3206 fn meaningful_content_tool_result_empty_body() {
3207 assert!(!has_meaningful_content("[tool_result: call_1]\n"));
3208 }
3209
3210 #[test]
3211 fn meaningful_content_tool_output_inline() {
3212 assert!(!has_meaningful_content("[tool output: bash] some result"));
3213 }
3214
3215 #[test]
3216 fn meaningful_content_tool_output_pruned() {
3217 assert!(!has_meaningful_content("[tool output: bash] (pruned)"));
3218 }
3219
3220 #[test]
3221 fn meaningful_content_tool_output_fenced() {
3222 assert!(!has_meaningful_content(
3223 "[tool output: bash]\n```\nls output\n```"
3224 ));
3225 }
3226
3227 #[test]
3228 fn meaningful_content_multiple_tool_use_tags() {
3229 assert!(!has_meaningful_content(
3230 "[tool_use: bash(id1)][tool_use: read(id2)]"
3231 ));
3232 }
3233
3234 #[test]
3235 fn meaningful_content_multiple_tool_use_tags_space_separator() {
3236 assert!(!has_meaningful_content(
3238 "[tool_use: bash(id1)] [tool_use: read(id2)]"
3239 ));
3240 }
3241
3242 #[test]
3243 fn meaningful_content_multiple_tool_use_tags_newline_separator() {
3244 assert!(!has_meaningful_content(
3246 "[tool_use: bash(id1)]\n[tool_use: read(id2)]"
3247 ));
3248 }
3249
3250 #[test]
3251 fn meaningful_content_tool_result_followed_by_tool_use() {
3252 assert!(!has_meaningful_content(
3254 "[tool_result: call_1]\nresult\n[tool_use: bash(call_2)]"
3255 ));
3256 }
3257
3258 #[test]
3259 fn meaningful_content_real_text_only() {
3260 assert!(has_meaningful_content("Hello, how can I help you?"));
3261 }
3262
3263 #[test]
3264 fn meaningful_content_text_before_tool_tag() {
3265 assert!(has_meaningful_content("Let me check. [tool_use: bash(id)]"));
3266 }
3267
3268 #[test]
3269 fn meaningful_content_text_after_tool_use_tag() {
3270 assert!(has_meaningful_content("[tool_use: bash] I ran the command"));
3274 }
3275
3276 #[test]
3277 fn meaningful_content_text_between_tags() {
3278 assert!(has_meaningful_content(
3279 "[tool_use: bash(id1)]\nand then\n[tool_use: read(id2)]"
3280 ));
3281 }
3282
3283 #[test]
3284 fn meaningful_content_malformed_tag_no_closing_bracket() {
3285 assert!(has_meaningful_content("[tool_use: "));
3287 }
3288
3289 #[test]
3290 fn meaningful_content_tool_use_and_tool_result_only() {
3291 assert!(!has_meaningful_content(
3293 "[tool_use: memory_save(call_abc)]\n[tool_result: call_abc]\nsaved"
3294 ));
3295 }
3296
3297 #[test]
3298 fn meaningful_content_tool_result_body_with_json_array() {
3299 assert!(!has_meaningful_content(
3300 "[tool_result: id1]\n[\"array\", \"value\"]"
3301 ));
3302 }
3303
3304 #[tokio::test]
3315 async fn issue_2529_orphaned_legacy_content_pair_is_soft_deleted() {
3316 use zeph_llm::provider::MessagePart;
3317
3318 let provider = mock_provider(vec![]);
3319 let channel = MockChannel::new(vec![]);
3320 let registry = create_test_registry();
3321 let executor = MockToolExecutor::no_tools();
3322
3323 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
3324 let cid = memory.sqlite().create_conversation().await.unwrap();
3325 let sqlite = memory.sqlite();
3326
3327 sqlite
3329 .save_message(cid, "user", "save this for me")
3330 .await
3331 .unwrap();
3332
3333 let use_parts = serde_json::to_string(&[MessagePart::ToolUse {
3336 id: "call_2529".to_string(),
3337 name: "memory_save".to_string(),
3338 input: serde_json::json!({"content": "save this"}),
3339 }])
3340 .unwrap();
3341 let orphan_assistant_id = sqlite
3342 .save_message_with_parts(
3343 cid,
3344 "assistant",
3345 "[tool_use: memory_save(call_2529)]",
3346 &use_parts,
3347 )
3348 .await
3349 .unwrap();
3350
3351 sqlite
3356 .save_message(cid, "assistant", "here is a plain reply")
3357 .await
3358 .unwrap();
3359
3360 let result_parts = serde_json::to_string(&[MessagePart::ToolResult {
3361 tool_use_id: "call_2529".to_string(),
3362 content: "saved".to_string(),
3363 is_error: false,
3364 }])
3365 .unwrap();
3366 let orphan_user_id = sqlite
3367 .save_message_with_parts(
3368 cid,
3369 "user",
3370 "[tool_result: call_2529]\nsaved",
3371 &result_parts,
3372 )
3373 .await
3374 .unwrap();
3375
3376 sqlite.save_message(cid, "assistant", "done").await.unwrap();
3378
3379 let memory_arc = std::sync::Arc::new(memory);
3380 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
3381 memory_arc.clone(),
3382 cid,
3383 50,
3384 5,
3385 100,
3386 );
3387
3388 agent.load_history().await.unwrap();
3389
3390 let assistant_deleted_count: Vec<i64> = zeph_db::query_scalar(
3393 "SELECT COUNT(*) FROM messages WHERE id = ? AND deleted_at IS NOT NULL",
3394 )
3395 .bind(orphan_assistant_id)
3396 .fetch_all(memory_arc.sqlite().pool())
3397 .await
3398 .unwrap();
3399
3400 let user_deleted_count: Vec<i64> = zeph_db::query_scalar(
3401 "SELECT COUNT(*) FROM messages WHERE id = ? AND deleted_at IS NOT NULL",
3402 )
3403 .bind(orphan_user_id)
3404 .fetch_all(memory_arc.sqlite().pool())
3405 .await
3406 .unwrap();
3407
3408 assert_eq!(
3409 assistant_deleted_count.first().copied().unwrap_or(0),
3410 1,
3411 "orphaned assistant[ToolUse] with legacy-only content must be soft-deleted (deleted_at IS NOT NULL)"
3412 );
3413 assert_eq!(
3414 user_deleted_count.first().copied().unwrap_or(0),
3415 1,
3416 "orphaned user[ToolResult] with legacy-only content must be soft-deleted (deleted_at IS NOT NULL)"
3417 );
3418 }
3419
3420 #[tokio::test]
3424 async fn issue_2529_soft_delete_is_idempotent_across_sessions() {
3425 use zeph_llm::provider::MessagePart;
3426
3427 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
3428 let cid = memory.sqlite().create_conversation().await.unwrap();
3429 let sqlite = memory.sqlite();
3430
3431 sqlite
3433 .save_message(cid, "user", "do something")
3434 .await
3435 .unwrap();
3436
3437 let use_parts = serde_json::to_string(&[MessagePart::ToolUse {
3439 id: "call_idem".to_string(),
3440 name: "shell".to_string(),
3441 input: serde_json::json!({"command": "ls"}),
3442 }])
3443 .unwrap();
3444 sqlite
3445 .save_message_with_parts(cid, "assistant", "[tool_use: shell(call_idem)]", &use_parts)
3446 .await
3447 .unwrap();
3448
3449 sqlite
3451 .save_message(cid, "assistant", "continuing")
3452 .await
3453 .unwrap();
3454
3455 let result_parts = serde_json::to_string(&[MessagePart::ToolResult {
3457 tool_use_id: "call_idem".to_string(),
3458 content: "output".to_string(),
3459 is_error: false,
3460 }])
3461 .unwrap();
3462 sqlite
3463 .save_message_with_parts(
3464 cid,
3465 "user",
3466 "[tool_result: call_idem]\noutput",
3467 &result_parts,
3468 )
3469 .await
3470 .unwrap();
3471
3472 sqlite
3473 .save_message(cid, "assistant", "final")
3474 .await
3475 .unwrap();
3476
3477 let memory_arc = std::sync::Arc::new(memory);
3478
3479 let mut agent1 = Agent::new(
3481 mock_provider(vec![]),
3482 MockChannel::new(vec![]),
3483 create_test_registry(),
3484 None,
3485 5,
3486 MockToolExecutor::no_tools(),
3487 )
3488 .with_memory(memory_arc.clone(), cid, 50, 5, 100);
3489 agent1.load_history().await.unwrap();
3490 let count_after_first = agent1.msg.messages.len();
3491
3492 let mut agent2 = Agent::new(
3495 mock_provider(vec![]),
3496 MockChannel::new(vec![]),
3497 create_test_registry(),
3498 None,
3499 5,
3500 MockToolExecutor::no_tools(),
3501 )
3502 .with_memory(memory_arc.clone(), cid, 50, 5, 100);
3503 agent2.load_history().await.unwrap();
3504 let count_after_second = agent2.msg.messages.len();
3505
3506 assert_eq!(
3508 count_after_first, count_after_second,
3509 "second load_history must load the same message count as the first (soft-deleted orphans excluded)"
3510 );
3511 }
3512
3513 #[tokio::test]
3517 async fn issue_2529_message_with_text_and_tool_tag_is_kept_after_part_strip() {
3518 use zeph_llm::provider::MessagePart;
3519
3520 let provider = mock_provider(vec![]);
3521 let channel = MockChannel::new(vec![]);
3522 let registry = create_test_registry();
3523 let executor = MockToolExecutor::no_tools();
3524
3525 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
3526 let cid = memory.sqlite().create_conversation().await.unwrap();
3527 let sqlite = memory.sqlite();
3528
3529 sqlite
3531 .save_message(cid, "user", "check the files")
3532 .await
3533 .unwrap();
3534
3535 let use_parts = serde_json::to_string(&[MessagePart::ToolUse {
3538 id: "call_mixed".to_string(),
3539 name: "shell".to_string(),
3540 input: serde_json::json!({"command": "ls"}),
3541 }])
3542 .unwrap();
3543 sqlite
3544 .save_message_with_parts(
3545 cid,
3546 "assistant",
3547 "Let me list the directory. [tool_use: shell(call_mixed)]",
3548 &use_parts,
3549 )
3550 .await
3551 .unwrap();
3552
3553 sqlite.save_message(cid, "user", "thanks").await.unwrap();
3555 sqlite
3556 .save_message(cid, "assistant", "you are welcome")
3557 .await
3558 .unwrap();
3559
3560 let memory_arc = std::sync::Arc::new(memory);
3561 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
3562 memory_arc.clone(),
3563 cid,
3564 50,
3565 5,
3566 100,
3567 );
3568
3569 let messages_before = agent.msg.messages.len();
3570 agent.load_history().await.unwrap();
3571
3572 assert_eq!(
3574 agent.msg.messages.len(),
3575 messages_before + 4,
3576 "assistant message with text + tool tag must not be removed after ToolUse strip"
3577 );
3578
3579 let mixed_msg = agent
3581 .msg
3582 .messages
3583 .iter()
3584 .find(|m| m.content.contains("Let me list the directory"))
3585 .expect("mixed-content assistant message must still be in history");
3586 assert!(
3587 !mixed_msg
3588 .parts
3589 .iter()
3590 .any(|p| matches!(p, MessagePart::ToolUse { .. })),
3591 "orphaned ToolUse parts must be stripped even when message has meaningful text"
3592 );
3593 assert_eq!(
3594 mixed_msg.content, "Let me list the directory. [tool_use: shell(call_mixed)]",
3595 "content field must be unchanged — only parts are stripped"
3596 );
3597 }
3598
3599 #[tokio::test]
3602 async fn persist_message_skipped_tool_result_does_not_embed() {
3603 use zeph_llm::provider::MessagePart;
3604
3605 let provider = mock_provider(vec![]);
3606 let channel = MockChannel::new(vec![]);
3607 let registry = create_test_registry();
3608 let executor = MockToolExecutor::no_tools();
3609
3610 let (tx, rx) = tokio::sync::watch::channel(MetricsSnapshot::default());
3611 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
3612 let cid = memory.sqlite().create_conversation().await.unwrap();
3613
3614 let mut agent = Agent::new(provider, channel, registry, None, 5, executor)
3615 .with_metrics(tx)
3616 .with_memory(std::sync::Arc::new(memory), cid, 50, 5, 100);
3617 agent.memory_state.persistence.autosave_assistant = true;
3618 agent.memory_state.persistence.autosave_min_length = 0;
3619
3620 let parts = vec![MessagePart::ToolResult {
3621 tool_use_id: "tu1".into(),
3622 content: "[skipped] bash tool was blocked by utility gate".into(),
3623 is_error: false,
3624 }];
3625
3626 agent
3627 .persist_message(
3628 Role::User,
3629 "[skipped] bash tool was blocked by utility gate",
3630 &parts,
3631 false,
3632 )
3633 .await;
3634
3635 assert_eq!(
3636 rx.borrow().embeddings_generated,
3637 0,
3638 "[skipped] ToolResult must not be embedded into Qdrant"
3639 );
3640 }
3641
3642 #[tokio::test]
3643 async fn persist_message_stopped_tool_result_does_not_embed() {
3644 use zeph_llm::provider::MessagePart;
3645
3646 let provider = mock_provider(vec![]);
3647 let channel = MockChannel::new(vec![]);
3648 let registry = create_test_registry();
3649 let executor = MockToolExecutor::no_tools();
3650
3651 let (tx, rx) = tokio::sync::watch::channel(MetricsSnapshot::default());
3652 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
3653 let cid = memory.sqlite().create_conversation().await.unwrap();
3654
3655 let mut agent = Agent::new(provider, channel, registry, None, 5, executor)
3656 .with_metrics(tx)
3657 .with_memory(std::sync::Arc::new(memory), cid, 50, 5, 100);
3658 agent.memory_state.persistence.autosave_assistant = true;
3659 agent.memory_state.persistence.autosave_min_length = 0;
3660
3661 let parts = vec![MessagePart::ToolResult {
3662 tool_use_id: "tu2".into(),
3663 content: "[stopped] execution limit reached".into(),
3664 is_error: false,
3665 }];
3666
3667 agent
3668 .persist_message(
3669 Role::User,
3670 "[stopped] execution limit reached",
3671 &parts,
3672 false,
3673 )
3674 .await;
3675
3676 assert_eq!(
3677 rx.borrow().embeddings_generated,
3678 0,
3679 "[stopped] ToolResult must not be embedded into Qdrant"
3680 );
3681 }
3682
3683 #[tokio::test]
3684 async fn persist_message_normal_tool_result_is_saved_not_blocked_by_guard() {
3685 use zeph_llm::provider::MessagePart;
3688
3689 let provider = mock_provider(vec![]);
3690 let channel = MockChannel::new(vec![]);
3691 let registry = create_test_registry();
3692 let executor = MockToolExecutor::no_tools();
3693
3694 let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
3695 let cid = memory.sqlite().create_conversation().await.unwrap();
3696 let memory_arc = std::sync::Arc::new(memory);
3697
3698 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
3699 memory_arc.clone(),
3700 cid,
3701 50,
3702 5,
3703 100,
3704 );
3705 agent.memory_state.persistence.autosave_assistant = true;
3706 agent.memory_state.persistence.autosave_min_length = 0;
3707
3708 let content = "total 42\ndrwxr-xr-x 5 user group";
3709 let parts = vec![MessagePart::ToolResult {
3710 tool_use_id: "tu3".into(),
3711 content: content.into(),
3712 is_error: false,
3713 }];
3714
3715 agent
3716 .persist_message(Role::User, content, &parts, false)
3717 .await;
3718
3719 let history = memory_arc.sqlite().load_history(cid, 50).await.unwrap();
3721 assert_eq!(
3722 history.len(),
3723 1,
3724 "normal ToolResult must be saved to SQLite"
3725 );
3726 assert_eq!(history[0].content, content);
3727 }
3728
3729 #[test]
3734 fn trajectory_extraction_slice_bounds_messages() {
3735 let max_messages: usize = 20;
3737 let total_messages = 100usize;
3738
3739 let tail_start = total_messages.saturating_sub(max_messages);
3740 let window = total_messages - tail_start;
3741
3742 assert_eq!(
3743 window, 20,
3744 "slice should contain exactly max_messages items"
3745 );
3746 assert_eq!(tail_start, 80, "slice should start at len - max_messages");
3747 }
3748
3749 #[test]
3750 fn trajectory_extraction_slice_handles_few_messages() {
3751 let max_messages: usize = 20;
3752 let total_messages = 5usize;
3753
3754 let tail_start = total_messages.saturating_sub(max_messages);
3755 let window = total_messages - tail_start;
3756
3757 assert_eq!(window, 5, "should return all messages when fewer than max");
3758 assert_eq!(tail_start, 0, "slice should start from the beginning");
3759 }
3760}