Skip to main content

zeph_core/agent/
persistence.rs

1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4use std::collections::HashSet;
5
6use crate::channel::Channel;
7use zeph_llm::provider::{Message, MessagePart, Role};
8use zeph_memory::sqlite::role_str;
9
10use super::Agent;
11
12/// Remove orphaned `ToolUse`/`ToolResult` messages from restored history.
13///
14/// Four failure modes are handled:
15/// 1. Trailing orphan: the last message is an assistant with `ToolUse` parts but no subsequent
16///    user message with `ToolResult` — caused by LIMIT boundary splits or interrupted sessions.
17/// 2. Leading orphan: the first message is a user with `ToolResult` parts but no preceding
18///    assistant message with `ToolUse` — caused by LIMIT boundary cuts.
19/// 3. Mid-history orphaned `ToolUse`: an assistant message with `ToolUse` parts is not followed
20///    by a user message with matching `ToolResult` parts. The `ToolUse` parts are stripped;
21///    if no content remains the message is removed.
22/// 4. Mid-history orphaned `ToolResult`: a user message has `ToolResult` parts whose
23///    `tool_use_id` is not present in the preceding assistant message. Those `ToolResult` parts
24///    are stripped; if no content remains the message is removed.
25///
26/// Boundary cases are resolved in a loop before the mid-history scan runs.
27fn sanitize_tool_pairs(messages: &mut Vec<Message>) -> usize {
28    let mut removed = 0;
29
30    loop {
31        // Remove trailing orphaned tool_use (assistant message with ToolUse, no following tool_result).
32        if let Some(last) = messages.last()
33            && last.role == Role::Assistant
34            && last
35                .parts
36                .iter()
37                .any(|p| matches!(p, MessagePart::ToolUse { .. }))
38        {
39            let ids: Vec<String> = last
40                .parts
41                .iter()
42                .filter_map(|p| {
43                    if let MessagePart::ToolUse { id, .. } = p {
44                        Some(id.clone())
45                    } else {
46                        None
47                    }
48                })
49                .collect();
50            tracing::warn!(
51                tool_ids = ?ids,
52                "removing orphaned trailing tool_use message from restored history"
53            );
54            messages.pop();
55            removed += 1;
56            continue;
57        }
58
59        // Remove leading orphaned tool_result (user message with ToolResult, no preceding tool_use).
60        if let Some(first) = messages.first()
61            && first.role == Role::User
62            && first
63                .parts
64                .iter()
65                .any(|p| matches!(p, MessagePart::ToolResult { .. }))
66        {
67            let ids: Vec<String> = first
68                .parts
69                .iter()
70                .filter_map(|p| {
71                    if let MessagePart::ToolResult { tool_use_id, .. } = p {
72                        Some(tool_use_id.clone())
73                    } else {
74                        None
75                    }
76                })
77                .collect();
78            tracing::warn!(
79                tool_use_ids = ?ids,
80                "removing orphaned leading tool_result message from restored history"
81            );
82            messages.remove(0);
83            removed += 1;
84            continue;
85        }
86
87        break;
88    }
89
90    // Mid-history scan: strip ToolUse parts from any assistant message whose tool IDs are not
91    // matched by ToolResult parts in the immediately following user message.
92    removed += strip_mid_history_orphans(messages);
93
94    removed
95}
96
97/// Scan all messages and strip orphaned `ToolUse`/`ToolResult` parts from mid-history messages.
98///
99/// Two directions are checked:
100/// - Forward: assistant message has `ToolUse` parts not matched by `ToolResult` in the next user
101///   message — strip those `ToolUse` parts.
102/// - Reverse: user message has `ToolResult` parts whose `tool_use_id` is not present as a
103///   `ToolUse` in the preceding assistant message — strip those `ToolResult` parts.
104///
105/// Text parts are preserved; messages with no remaining content are removed entirely.
106///
107/// Returns the number of messages removed (stripped-to-empty messages count as 1 each).
108/// Collect `tool_use` IDs from `msg` that have no matching `ToolResult` in `next_msg`.
109fn orphaned_tool_use_ids(msg: &Message, next_msg: Option<&Message>) -> HashSet<String> {
110    let matched: HashSet<String> = next_msg
111        .filter(|n| n.role == Role::User)
112        .map(|n| {
113            msg.parts
114                .iter()
115                .filter_map(|p| if let MessagePart::ToolUse { id, .. } = p { Some(id.clone()) } else { None })
116                .filter(|uid| n.parts.iter().any(|np| matches!(np, MessagePart::ToolResult { tool_use_id, .. } if tool_use_id == uid)))
117                .collect()
118        })
119        .unwrap_or_default();
120    msg.parts
121        .iter()
122        .filter_map(|p| {
123            if let MessagePart::ToolUse { id, .. } = p
124                && !matched.contains(id)
125            {
126                Some(id.clone())
127            } else {
128                None
129            }
130        })
131        .collect()
132}
133
134/// Collect `tool_result` IDs from `msg` that have no matching `ToolUse` in `prev_msg`.
135fn orphaned_tool_result_ids(msg: &Message, prev_msg: Option<&Message>) -> HashSet<String> {
136    let avail: HashSet<&str> = prev_msg
137        .filter(|p| p.role == Role::Assistant)
138        .map(|p| {
139            p.parts
140                .iter()
141                .filter_map(|part| {
142                    if let MessagePart::ToolUse { id, .. } = part {
143                        Some(id.as_str())
144                    } else {
145                        None
146                    }
147                })
148                .collect()
149        })
150        .unwrap_or_default();
151    msg.parts
152        .iter()
153        .filter_map(|p| {
154            if let MessagePart::ToolResult { tool_use_id, .. } = p
155                && !avail.contains(tool_use_id.as_str())
156            {
157                Some(tool_use_id.clone())
158            } else {
159                None
160            }
161        })
162        .collect()
163}
164
165fn strip_mid_history_orphans(messages: &mut Vec<Message>) -> usize {
166    let mut removed = 0;
167    let mut i = 0;
168    while i < messages.len() {
169        // Forward pass: strip ToolUse parts from assistant messages that lack a matching
170        // ToolResult in the next user message. Only orphaned IDs are stripped — other ToolUse
171        // parts in the same message that DO have a matching ToolResult are preserved.
172        if messages[i].role == Role::Assistant
173            && messages[i]
174                .parts
175                .iter()
176                .any(|p| matches!(p, MessagePart::ToolUse { .. }))
177        {
178            let orphaned_ids = orphaned_tool_use_ids(&messages[i], messages.get(i + 1));
179            if !orphaned_ids.is_empty() {
180                tracing::warn!(
181                    tool_ids = ?orphaned_ids,
182                    index = i,
183                    "stripping orphaned mid-history tool_use parts from assistant message"
184                );
185                messages[i].parts.retain(
186                    |p| !matches!(p, MessagePart::ToolUse { id, .. } if orphaned_ids.contains(id)),
187                );
188                let is_empty =
189                    messages[i].content.trim().is_empty() && messages[i].parts.is_empty();
190                if is_empty {
191                    messages.remove(i);
192                    removed += 1;
193                    continue; // Do not advance i — the next message is now at position i.
194                }
195            }
196        }
197
198        // Reverse pass: user ToolResult without matching ToolUse in the preceding assistant message.
199        if messages[i].role == Role::User
200            && messages[i]
201                .parts
202                .iter()
203                .any(|p| matches!(p, MessagePart::ToolResult { .. }))
204        {
205            let orphaned_ids = orphaned_tool_result_ids(
206                &messages[i],
207                if i > 0 { messages.get(i - 1) } else { None },
208            );
209            if !orphaned_ids.is_empty() {
210                tracing::warn!(
211                    tool_use_ids = ?orphaned_ids,
212                    index = i,
213                    "stripping orphaned mid-history tool_result parts from user message"
214                );
215                messages[i].parts.retain(|p| {
216                    !matches!(p, MessagePart::ToolResult { tool_use_id, .. } if orphaned_ids.contains(tool_use_id.as_str()))
217                });
218
219                let is_empty =
220                    messages[i].content.trim().is_empty() && messages[i].parts.is_empty();
221                if is_empty {
222                    messages.remove(i);
223                    removed += 1;
224                    // Do not advance i — the next message is now at position i.
225                    continue;
226                }
227            }
228        }
229
230        i += 1;
231    }
232    removed
233}
234
235impl<C: Channel> Agent<C> {
236    /// Load conversation history from memory and inject into messages.
237    ///
238    /// # Errors
239    ///
240    /// Returns an error if loading history from `SQLite` fails.
241    pub async fn load_history(&mut self) -> Result<(), super::error::AgentError> {
242        let (Some(memory), Some(cid)) =
243            (&self.memory_state.memory, self.memory_state.conversation_id)
244        else {
245            return Ok(());
246        };
247
248        let history = memory
249            .sqlite()
250            .load_history_filtered(cid, self.memory_state.history_limit, Some(true), None)
251            .await?;
252        if !history.is_empty() {
253            let mut loaded = 0;
254            let mut skipped = 0;
255
256            for msg in history {
257                // Only skip messages that have neither text content nor structured parts.
258                // Native tool calls produce user messages with empty `content` but non-empty
259                // `parts` (containing ToolResult). Skipping them here would orphan the
260                // preceding assistant ToolUse before sanitize_tool_pairs can clean it up.
261                if msg.content.trim().is_empty() && msg.parts.is_empty() {
262                    tracing::warn!("skipping empty message from history (role: {:?})", msg.role);
263                    skipped += 1;
264                    continue;
265                }
266                self.msg.messages.push(msg);
267                loaded += 1;
268            }
269
270            // Determine the start index of just-loaded messages (system prompt is at index 0).
271            let history_start = self.msg.messages.len() - loaded;
272            let mut restored_slice = self.msg.messages.split_off(history_start);
273            let orphans = sanitize_tool_pairs(&mut restored_slice);
274            skipped += orphans;
275            loaded = loaded.saturating_sub(orphans);
276            self.msg.messages.append(&mut restored_slice);
277
278            tracing::info!("restored {loaded} message(s) from conversation {cid}");
279            if skipped > 0 {
280                tracing::warn!("skipped {skipped} empty/orphaned message(s) from history");
281            }
282
283            if loaded > 0 {
284                // Increment session counts so tier promotion can track cross-session access.
285                // Errors are non-fatal — promotion will simply use stale counts.
286                let _ = memory
287                    .sqlite()
288                    .increment_session_counts_for_conversation(cid)
289                    .await
290                    .inspect_err(|e| {
291                        tracing::warn!(error = %e, "failed to increment tier session counts");
292                    });
293            }
294        }
295
296        if let Ok(count) = memory.message_count(cid).await {
297            let count_u64 = u64::try_from(count).unwrap_or(0);
298            self.update_metrics(|m| {
299                m.sqlite_message_count = count_u64;
300            });
301        }
302
303        if let Ok(count) = memory.sqlite().count_semantic_facts().await {
304            let count_u64 = u64::try_from(count).unwrap_or(0);
305            self.update_metrics(|m| {
306                m.semantic_fact_count = count_u64;
307            });
308        }
309
310        if let Ok(count) = memory.unsummarized_message_count(cid).await {
311            self.memory_state.unsummarized_count = usize::try_from(count).unwrap_or(0);
312        }
313
314        self.recompute_prompt_tokens();
315        Ok(())
316    }
317
318    /// Persist a message to memory.
319    ///
320    /// `has_injection_flags` controls whether Qdrant embedding is skipped for this message.
321    /// When `true` and `guard_memory_writes` is enabled, only `SQLite` is written — the message
322    /// is saved for conversation continuity but will not pollute semantic search (M2, D2).
323    pub(crate) async fn persist_message(
324        &mut self,
325        role: Role,
326        content: &str,
327        parts: &[MessagePart],
328        has_injection_flags: bool,
329    ) {
330        let (Some(memory), Some(cid)) =
331            (&self.memory_state.memory, self.memory_state.conversation_id)
332        else {
333            return;
334        };
335
336        let parts_json = if parts.is_empty() {
337            "[]".to_string()
338        } else {
339            serde_json::to_string(parts).unwrap_or_else(|e| {
340                tracing::warn!("failed to serialize message parts, storing empty: {e}");
341                "[]".to_string()
342            })
343        };
344
345        // M2: injection flag is passed explicitly to avoid stale mutable-bool state on Agent.
346        // When has_injection_flags=true, skip embedding to prevent poisoned content from
347        // polluting Qdrant semantic search results.
348        let guard_event = self
349            .security
350            .exfiltration_guard
351            .should_guard_memory_write(has_injection_flags);
352        if let Some(ref event) = guard_event {
353            tracing::warn!(
354                ?event,
355                "exfiltration guard: skipping Qdrant embedding for flagged content"
356            );
357            self.update_metrics(|m| m.exfiltration_memory_guards += 1);
358            self.push_security_event(
359                crate::metrics::SecurityEventCategory::ExfiltrationBlock,
360                "memory_write",
361                "Qdrant embedding skipped: flagged content",
362            );
363        }
364
365        let skip_embedding = guard_event.is_some();
366
367        let should_embed = if skip_embedding {
368            false
369        } else {
370            match role {
371                Role::Assistant => {
372                    self.memory_state.autosave_assistant
373                        && content.len() >= self.memory_state.autosave_min_length
374                }
375                _ => true,
376            }
377        };
378
379        let embedding_stored = if should_embed {
380            match memory
381                .remember_with_parts(cid, role_str(role), content, &parts_json)
382                .await
383            {
384                Ok((message_id, stored)) => {
385                    self.last_persisted_message_id = Some(message_id.0);
386                    stored
387                }
388                Err(e) => {
389                    tracing::error!("failed to persist message: {e:#}");
390                    return;
391                }
392            }
393        } else {
394            match memory
395                .save_only(cid, role_str(role), content, &parts_json)
396                .await
397            {
398                Ok(message_id) => {
399                    self.last_persisted_message_id = Some(message_id.0);
400                    false
401                }
402                Err(e) => {
403                    tracing::error!("failed to persist message: {e:#}");
404                    return;
405                }
406            }
407        };
408
409        self.memory_state.unsummarized_count += 1;
410
411        self.update_metrics(|m| {
412            m.sqlite_message_count += 1;
413            if embedding_stored {
414                m.embeddings_generated += 1;
415            }
416        });
417
418        self.check_summarization().await;
419
420        // FIX-1: skip graph extraction for tool result messages — they contain raw structured
421        // output (TOML, JSON, code) that pollutes the entity graph with noise.
422        let has_tool_result_parts = parts
423            .iter()
424            .any(|p| matches!(p, MessagePart::ToolResult { .. }));
425
426        self.maybe_spawn_graph_extraction(content, has_injection_flags, has_tool_result_parts)
427            .await;
428    }
429
430    async fn maybe_spawn_graph_extraction(
431        &mut self,
432        content: &str,
433        has_injection_flags: bool,
434        has_tool_result_parts: bool,
435    ) {
436        use zeph_memory::semantic::GraphExtractionConfig;
437
438        if self.memory_state.memory.is_none() || self.memory_state.conversation_id.is_none() {
439            return;
440        }
441
442        // FIX-1: skip extraction for tool result messages — raw tool output is structural data,
443        // not conversational content. Extracting entities from it produces graph noise.
444        if has_tool_result_parts {
445            tracing::debug!("graph extraction skipped: message contains ToolResult parts");
446            return;
447        }
448
449        // S2: skip extraction when injection flags detected — content is untrusted LLM input
450        if has_injection_flags {
451            tracing::warn!("graph extraction skipped: injection patterns detected in content");
452            return;
453        }
454
455        // Collect extraction config — borrow ends before send_status call
456        let extraction_cfg = {
457            let cfg = &self.memory_state.graph_config;
458            if !cfg.enabled {
459                return;
460            }
461            GraphExtractionConfig {
462                max_entities: cfg.max_entities_per_message,
463                max_edges: cfg.max_edges_per_message,
464                extraction_timeout_secs: cfg.extraction_timeout_secs,
465                community_refresh_interval: cfg.community_refresh_interval,
466                expired_edge_retention_days: cfg.expired_edge_retention_days,
467                max_entities_cap: cfg.max_entities,
468                community_summary_max_prompt_bytes: cfg.community_summary_max_prompt_bytes,
469                community_summary_concurrency: cfg.community_summary_concurrency,
470                lpa_edge_chunk_size: cfg.lpa_edge_chunk_size,
471                note_linking: zeph_memory::NoteLinkingConfig {
472                    enabled: cfg.note_linking.enabled,
473                    similarity_threshold: cfg.note_linking.similarity_threshold,
474                    top_k: cfg.note_linking.top_k,
475                    timeout_secs: cfg.note_linking.timeout_secs,
476                },
477                link_weight_decay_lambda: cfg.link_weight_decay_lambda,
478                link_weight_decay_interval_secs: cfg.link_weight_decay_interval_secs,
479            }
480        };
481
482        // FIX-2: collect last 4 genuine conversational user messages as context for extraction.
483        // Exclude tool result messages (Role::User with ToolResult parts) — they contain
484        // raw structured output and would pollute the extraction context with noise.
485        let context_messages: Vec<String> = self
486            .msg
487            .messages
488            .iter()
489            .rev()
490            .filter(|m| {
491                m.role == Role::User
492                    && !m
493                        .parts
494                        .iter()
495                        .any(|p| matches!(p, MessagePart::ToolResult { .. }))
496            })
497            .take(4)
498            .map(|m| m.content.clone())
499            .collect();
500
501        let _ = self.channel.send_status("saving to graph...").await;
502
503        if let Some(memory) = &self.memory_state.memory {
504            // Build optional validation callback from MemoryWriteValidator (S3 fix).
505            // zeph-memory receives a generic Fn predicate — it does not depend on security types.
506            let validator: zeph_memory::semantic::PostExtractValidator =
507                if self.security.memory_validator.is_enabled() {
508                    let v = self.security.memory_validator.clone();
509                    Some(Box::new(move |result| {
510                        v.validate_graph_extraction(result)
511                            .map_err(|e| e.to_string())
512                    }))
513                } else {
514                    None
515                };
516            let extraction_handle = memory.spawn_graph_extraction(
517                content.to_owned(),
518                context_messages,
519                extraction_cfg,
520                validator,
521            );
522            // After the background extraction completes, refresh graph counts in metrics.
523            // This ensures the TUI panel reflects actual DB counts rather than stale zeros.
524            if let (Some(store), Some(tx)) =
525                (memory.graph_store.clone(), self.metrics.metrics_tx.clone())
526            {
527                let start = self.lifecycle.start_time;
528                tokio::spawn(async move {
529                    let _ = extraction_handle.await;
530                    let (entities, edges, communities) = tokio::join!(
531                        store.entity_count(),
532                        store.active_edge_count(),
533                        store.community_count()
534                    );
535                    let elapsed = start.elapsed().as_secs();
536                    tx.send_modify(|m| {
537                        m.uptime_seconds = elapsed;
538                        m.graph_entities_total = entities.unwrap_or(0).cast_unsigned();
539                        m.graph_edges_total = edges.unwrap_or(0).cast_unsigned();
540                        m.graph_communities_total = communities.unwrap_or(0).cast_unsigned();
541                    });
542                });
543            }
544        }
545        let _ = self.channel.send_status("").await;
546        self.sync_community_detection_failures();
547        self.sync_graph_extraction_metrics();
548        self.sync_graph_counts().await;
549        #[cfg(feature = "compression-guidelines")]
550        self.sync_guidelines_status().await;
551    }
552
553    pub(crate) async fn check_summarization(&mut self) {
554        let (Some(memory), Some(cid)) =
555            (&self.memory_state.memory, self.memory_state.conversation_id)
556        else {
557            return;
558        };
559
560        if self.memory_state.unsummarized_count > self.memory_state.summarization_threshold {
561            let _ = self.channel.send_status("summarizing...").await;
562            let batch_size = self.memory_state.summarization_threshold / 2;
563            match memory.summarize(cid, batch_size).await {
564                Ok(Some(summary_id)) => {
565                    tracing::info!("created summary {summary_id} for conversation {cid}");
566                    self.memory_state.unsummarized_count = 0;
567                    self.update_metrics(|m| {
568                        m.summaries_count += 1;
569                    });
570                }
571                Ok(None) => {
572                    tracing::debug!("no summarization needed");
573                }
574                Err(e) => {
575                    tracing::error!("summarization failed: {e:#}");
576                }
577            }
578            let _ = self.channel.send_status("").await;
579        }
580    }
581}
582
583#[cfg(test)]
584mod tests {
585    use super::super::agent_tests::{
586        MetricsSnapshot, MockChannel, MockToolExecutor, create_test_registry, mock_provider,
587    };
588    use super::*;
589    use zeph_llm::any::AnyProvider;
590    use zeph_memory::semantic::SemanticMemory;
591
592    async fn test_memory(provider: &AnyProvider) -> SemanticMemory {
593        SemanticMemory::new(
594            ":memory:",
595            "http://127.0.0.1:1",
596            provider.clone(),
597            "test-model",
598        )
599        .await
600        .unwrap()
601    }
602
603    #[tokio::test]
604    async fn load_history_without_memory_returns_ok() {
605        let provider = mock_provider(vec![]);
606        let channel = MockChannel::new(vec![]);
607        let registry = create_test_registry();
608        let executor = MockToolExecutor::no_tools();
609        let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
610
611        let result = agent.load_history().await;
612        assert!(result.is_ok());
613        // No messages added when no memory is configured
614        assert_eq!(agent.msg.messages.len(), 1); // system prompt only
615    }
616
617    #[tokio::test]
618    async fn load_history_with_messages_injects_into_agent() {
619        let provider = mock_provider(vec![]);
620        let channel = MockChannel::new(vec![]);
621        let registry = create_test_registry();
622        let executor = MockToolExecutor::no_tools();
623
624        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
625        let cid = memory.sqlite().create_conversation().await.unwrap();
626
627        memory
628            .sqlite()
629            .save_message(cid, "user", "hello from history")
630            .await
631            .unwrap();
632        memory
633            .sqlite()
634            .save_message(cid, "assistant", "hi back")
635            .await
636            .unwrap();
637
638        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
639            std::sync::Arc::new(memory),
640            cid,
641            50,
642            5,
643            100,
644        );
645
646        let messages_before = agent.msg.messages.len();
647        agent.load_history().await.unwrap();
648        // Two messages were added from history
649        assert_eq!(agent.msg.messages.len(), messages_before + 2);
650    }
651
652    #[tokio::test]
653    async fn load_history_skips_empty_messages() {
654        let provider = mock_provider(vec![]);
655        let channel = MockChannel::new(vec![]);
656        let registry = create_test_registry();
657        let executor = MockToolExecutor::no_tools();
658
659        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
660        let cid = memory.sqlite().create_conversation().await.unwrap();
661
662        // Save an empty message (should be skipped) and a valid one
663        memory
664            .sqlite()
665            .save_message(cid, "user", "   ")
666            .await
667            .unwrap();
668        memory
669            .sqlite()
670            .save_message(cid, "user", "real message")
671            .await
672            .unwrap();
673
674        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
675            std::sync::Arc::new(memory),
676            cid,
677            50,
678            5,
679            100,
680        );
681
682        let messages_before = agent.msg.messages.len();
683        agent.load_history().await.unwrap();
684        // Only the non-empty message is loaded
685        assert_eq!(agent.msg.messages.len(), messages_before + 1);
686    }
687
688    #[tokio::test]
689    async fn load_history_with_empty_store_returns_ok() {
690        let provider = mock_provider(vec![]);
691        let channel = MockChannel::new(vec![]);
692        let registry = create_test_registry();
693        let executor = MockToolExecutor::no_tools();
694
695        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
696        let cid = memory.sqlite().create_conversation().await.unwrap();
697
698        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
699            std::sync::Arc::new(memory),
700            cid,
701            50,
702            5,
703            100,
704        );
705
706        let messages_before = agent.msg.messages.len();
707        agent.load_history().await.unwrap();
708        // No messages added — empty history
709        assert_eq!(agent.msg.messages.len(), messages_before);
710    }
711
712    #[tokio::test]
713    async fn load_history_increments_session_count_for_existing_messages() {
714        let provider = mock_provider(vec![]);
715        let channel = MockChannel::new(vec![]);
716        let registry = create_test_registry();
717        let executor = MockToolExecutor::no_tools();
718
719        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
720        let cid = memory.sqlite().create_conversation().await.unwrap();
721
722        // Save two messages — they start with session_count = 0.
723        let id1 = memory
724            .sqlite()
725            .save_message(cid, "user", "hello")
726            .await
727            .unwrap();
728        let id2 = memory
729            .sqlite()
730            .save_message(cid, "assistant", "hi")
731            .await
732            .unwrap();
733
734        let memory_arc = std::sync::Arc::new(memory);
735        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
736            memory_arc.clone(),
737            cid,
738            50,
739            5,
740            100,
741        );
742
743        agent.load_history().await.unwrap();
744
745        // Both episodic messages must have session_count = 1 after restore.
746        let counts: Vec<i64> =
747            sqlx::query_scalar("SELECT session_count FROM messages WHERE id IN (?, ?) ORDER BY id")
748                .bind(id1)
749                .bind(id2)
750                .fetch_all(memory_arc.sqlite().pool())
751                .await
752                .unwrap();
753        assert_eq!(
754            counts,
755            vec![1, 1],
756            "session_count must be 1 after first restore"
757        );
758    }
759
760    #[tokio::test]
761    async fn load_history_does_not_increment_session_count_for_new_conversation() {
762        let provider = mock_provider(vec![]);
763        let channel = MockChannel::new(vec![]);
764        let registry = create_test_registry();
765        let executor = MockToolExecutor::no_tools();
766
767        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
768        let cid = memory.sqlite().create_conversation().await.unwrap();
769
770        // No messages saved — empty conversation.
771        let memory_arc = std::sync::Arc::new(memory);
772        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
773            memory_arc.clone(),
774            cid,
775            50,
776            5,
777            100,
778        );
779
780        agent.load_history().await.unwrap();
781
782        // No rows → no session_count increments → query returns empty.
783        let counts: Vec<i64> =
784            sqlx::query_scalar("SELECT session_count FROM messages WHERE conversation_id = ?")
785                .bind(cid)
786                .fetch_all(memory_arc.sqlite().pool())
787                .await
788                .unwrap();
789        assert!(counts.is_empty(), "new conversation must have no messages");
790    }
791
792    #[tokio::test]
793    async fn persist_message_without_memory_silently_returns() {
794        // No memory configured — persist_message must not panic
795        let provider = mock_provider(vec![]);
796        let channel = MockChannel::new(vec![]);
797        let registry = create_test_registry();
798        let executor = MockToolExecutor::no_tools();
799        let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
800
801        // Must not panic and must complete
802        agent.persist_message(Role::User, "hello", &[], false).await;
803    }
804
805    #[tokio::test]
806    async fn persist_message_assistant_autosave_false_uses_save_only() {
807        let provider = mock_provider(vec![]);
808        let channel = MockChannel::new(vec![]);
809        let registry = create_test_registry();
810        let executor = MockToolExecutor::no_tools();
811
812        let (tx, rx) = tokio::sync::watch::channel(MetricsSnapshot::default());
813        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
814        let cid = memory.sqlite().create_conversation().await.unwrap();
815
816        let mut agent = Agent::new(provider, channel, registry, None, 5, executor)
817            .with_metrics(tx)
818            .with_memory(std::sync::Arc::new(memory), cid, 50, 5, 100)
819            .with_autosave_config(false, 20);
820
821        agent
822            .persist_message(Role::Assistant, "short assistant reply", &[], false)
823            .await;
824
825        let history = agent
826            .memory_state
827            .memory
828            .as_ref()
829            .unwrap()
830            .sqlite()
831            .load_history(cid, 50)
832            .await
833            .unwrap();
834        assert_eq!(history.len(), 1, "message must be saved");
835        assert_eq!(history[0].content, "short assistant reply");
836        // embeddings_generated must remain 0 — save_only path does not embed
837        assert_eq!(rx.borrow().embeddings_generated, 0);
838    }
839
840    #[tokio::test]
841    async fn persist_message_assistant_below_min_length_uses_save_only() {
842        let provider = mock_provider(vec![]);
843        let channel = MockChannel::new(vec![]);
844        let registry = create_test_registry();
845        let executor = MockToolExecutor::no_tools();
846
847        let (tx, rx) = tokio::sync::watch::channel(MetricsSnapshot::default());
848        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
849        let cid = memory.sqlite().create_conversation().await.unwrap();
850
851        // autosave_assistant=true but min_length=1000 — short content falls back to save_only
852        let mut agent = Agent::new(provider, channel, registry, None, 5, executor)
853            .with_metrics(tx)
854            .with_memory(std::sync::Arc::new(memory), cid, 50, 5, 100)
855            .with_autosave_config(true, 1000);
856
857        agent
858            .persist_message(Role::Assistant, "too short", &[], false)
859            .await;
860
861        let history = agent
862            .memory_state
863            .memory
864            .as_ref()
865            .unwrap()
866            .sqlite()
867            .load_history(cid, 50)
868            .await
869            .unwrap();
870        assert_eq!(history.len(), 1, "message must be saved");
871        assert_eq!(history[0].content, "too short");
872        assert_eq!(rx.borrow().embeddings_generated, 0);
873    }
874
875    #[tokio::test]
876    async fn persist_message_assistant_at_min_length_boundary_uses_embed() {
877        // content.len() == autosave_min_length → should_embed = true (>= boundary).
878        let provider = mock_provider(vec![]);
879        let channel = MockChannel::new(vec![]);
880        let registry = create_test_registry();
881        let executor = MockToolExecutor::no_tools();
882
883        let (tx, rx) = tokio::sync::watch::channel(MetricsSnapshot::default());
884        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
885        let cid = memory.sqlite().create_conversation().await.unwrap();
886
887        let min_length = 10usize;
888        let mut agent = Agent::new(provider, channel, registry, None, 5, executor)
889            .with_metrics(tx)
890            .with_memory(std::sync::Arc::new(memory), cid, 50, 5, 100)
891            .with_autosave_config(true, min_length);
892
893        // Exact boundary: len == min_length → embed path.
894        let content_at_boundary = "A".repeat(min_length);
895        assert_eq!(content_at_boundary.len(), min_length);
896        agent
897            .persist_message(Role::Assistant, &content_at_boundary, &[], false)
898            .await;
899
900        // sqlite_message_count must be incremented regardless of embedding success.
901        assert_eq!(rx.borrow().sqlite_message_count, 1);
902    }
903
904    #[tokio::test]
905    async fn persist_message_assistant_one_below_min_length_uses_save_only() {
906        // content.len() == autosave_min_length - 1 → should_embed = false (below boundary).
907        let provider = mock_provider(vec![]);
908        let channel = MockChannel::new(vec![]);
909        let registry = create_test_registry();
910        let executor = MockToolExecutor::no_tools();
911
912        let (tx, rx) = tokio::sync::watch::channel(MetricsSnapshot::default());
913        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
914        let cid = memory.sqlite().create_conversation().await.unwrap();
915
916        let min_length = 10usize;
917        let mut agent = Agent::new(provider, channel, registry, None, 5, executor)
918            .with_metrics(tx)
919            .with_memory(std::sync::Arc::new(memory), cid, 50, 5, 100)
920            .with_autosave_config(true, min_length);
921
922        // One below boundary: len == min_length - 1 → save_only path, no embedding.
923        let content_below_boundary = "A".repeat(min_length - 1);
924        assert_eq!(content_below_boundary.len(), min_length - 1);
925        agent
926            .persist_message(Role::Assistant, &content_below_boundary, &[], false)
927            .await;
928
929        let history = agent
930            .memory_state
931            .memory
932            .as_ref()
933            .unwrap()
934            .sqlite()
935            .load_history(cid, 50)
936            .await
937            .unwrap();
938        assert_eq!(history.len(), 1, "message must still be saved");
939        // save_only path does not embed.
940        assert_eq!(rx.borrow().embeddings_generated, 0);
941    }
942
943    #[tokio::test]
944    async fn persist_message_increments_unsummarized_count() {
945        let provider = mock_provider(vec![]);
946        let channel = MockChannel::new(vec![]);
947        let registry = create_test_registry();
948        let executor = MockToolExecutor::no_tools();
949
950        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
951        let cid = memory.sqlite().create_conversation().await.unwrap();
952
953        // threshold=100 ensures no summarization is triggered
954        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
955            std::sync::Arc::new(memory),
956            cid,
957            50,
958            5,
959            100,
960        );
961
962        assert_eq!(agent.memory_state.unsummarized_count, 0);
963
964        agent.persist_message(Role::User, "first", &[], false).await;
965        assert_eq!(agent.memory_state.unsummarized_count, 1);
966
967        agent
968            .persist_message(Role::User, "second", &[], false)
969            .await;
970        assert_eq!(agent.memory_state.unsummarized_count, 2);
971    }
972
973    #[tokio::test]
974    async fn check_summarization_resets_counter_on_success() {
975        let provider = mock_provider(vec![]);
976        let channel = MockChannel::new(vec![]);
977        let registry = create_test_registry();
978        let executor = MockToolExecutor::no_tools();
979
980        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
981        let cid = memory.sqlite().create_conversation().await.unwrap();
982
983        // threshold=1 so the second persist triggers summarization check (count > threshold)
984        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
985            std::sync::Arc::new(memory),
986            cid,
987            50,
988            5,
989            1,
990        );
991
992        agent.persist_message(Role::User, "msg1", &[], false).await;
993        agent.persist_message(Role::User, "msg2", &[], false).await;
994
995        // After summarization attempt (summarize returns Ok(None) since no messages qualify),
996        // the counter is NOT reset to 0 — only reset on Ok(Some(_)).
997        // This verifies check_summarization is called and the guard condition works.
998        // unsummarized_count must be >= 2 before any summarization or 0 if summarization ran.
999        assert!(agent.memory_state.unsummarized_count <= 2);
1000    }
1001
1002    #[tokio::test]
1003    async fn unsummarized_count_not_incremented_without_memory() {
1004        let provider = mock_provider(vec![]);
1005        let channel = MockChannel::new(vec![]);
1006        let registry = create_test_registry();
1007        let executor = MockToolExecutor::no_tools();
1008        let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
1009
1010        agent.persist_message(Role::User, "hello", &[], false).await;
1011        // No memory configured — persist_message returns early, counter must stay 0.
1012        assert_eq!(agent.memory_state.unsummarized_count, 0);
1013    }
1014
1015    // R-CRIT-01: unit tests for maybe_spawn_graph_extraction guard conditions.
1016    mod graph_extraction_guards {
1017        use super::*;
1018        use crate::config::GraphConfig;
1019        use zeph_llm::provider::MessageMetadata;
1020        use zeph_memory::graph::GraphStore;
1021
1022        fn enabled_graph_config() -> GraphConfig {
1023            GraphConfig {
1024                enabled: true,
1025                ..GraphConfig::default()
1026            }
1027        }
1028
1029        async fn agent_with_graph(
1030            provider: &AnyProvider,
1031            config: GraphConfig,
1032        ) -> Agent<MockChannel> {
1033            let memory =
1034                test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1035            let cid = memory.sqlite().create_conversation().await.unwrap();
1036            Agent::new(
1037                provider.clone(),
1038                MockChannel::new(vec![]),
1039                create_test_registry(),
1040                None,
1041                5,
1042                MockToolExecutor::no_tools(),
1043            )
1044            .with_memory(std::sync::Arc::new(memory), cid, 50, 5, 100)
1045            .with_graph_config(config)
1046        }
1047
1048        #[tokio::test]
1049        async fn injection_flag_guard_skips_extraction() {
1050            // has_injection_flags=true → extraction must be skipped; no counter in graph_metadata.
1051            let provider = mock_provider(vec![]);
1052            let mut agent = agent_with_graph(&provider, enabled_graph_config()).await;
1053            let pool = agent
1054                .memory_state
1055                .memory
1056                .as_ref()
1057                .unwrap()
1058                .sqlite()
1059                .pool()
1060                .clone();
1061
1062            agent
1063                .maybe_spawn_graph_extraction("I use Rust", true, false)
1064                .await;
1065
1066            // Give any accidental spawn time to settle.
1067            tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1068
1069            let store = GraphStore::new(pool);
1070            let count = store.get_metadata("extraction_count").await.unwrap();
1071            assert!(
1072                count.is_none(),
1073                "injection flag must prevent extraction_count from being written"
1074            );
1075        }
1076
1077        #[tokio::test]
1078        async fn disabled_config_guard_skips_extraction() {
1079            // graph.enabled=false → extraction must be skipped.
1080            let provider = mock_provider(vec![]);
1081            let disabled_cfg = GraphConfig {
1082                enabled: false,
1083                ..GraphConfig::default()
1084            };
1085            let mut agent = agent_with_graph(&provider, disabled_cfg).await;
1086            let pool = agent
1087                .memory_state
1088                .memory
1089                .as_ref()
1090                .unwrap()
1091                .sqlite()
1092                .pool()
1093                .clone();
1094
1095            agent
1096                .maybe_spawn_graph_extraction("I use Rust", false, false)
1097                .await;
1098
1099            tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1100
1101            let store = GraphStore::new(pool);
1102            let count = store.get_metadata("extraction_count").await.unwrap();
1103            assert!(
1104                count.is_none(),
1105                "disabled graph config must prevent extraction"
1106            );
1107        }
1108
1109        #[tokio::test]
1110        async fn happy_path_fires_extraction() {
1111            // With enabled config and no injection flags, extraction is spawned.
1112            // MockProvider returns None (no entities), but the counter must be incremented.
1113            let provider = mock_provider(vec![]);
1114            let mut agent = agent_with_graph(&provider, enabled_graph_config()).await;
1115            let pool = agent
1116                .memory_state
1117                .memory
1118                .as_ref()
1119                .unwrap()
1120                .sqlite()
1121                .pool()
1122                .clone();
1123
1124            agent
1125                .maybe_spawn_graph_extraction("I use Rust for systems programming", false, false)
1126                .await;
1127
1128            // Wait for the spawned task to complete.
1129            tokio::time::sleep(std::time::Duration::from_millis(200)).await;
1130
1131            let store = GraphStore::new(pool);
1132            let count = store.get_metadata("extraction_count").await.unwrap();
1133            assert!(
1134                count.is_some(),
1135                "happy-path extraction must increment extraction_count"
1136            );
1137        }
1138
1139        #[tokio::test]
1140        async fn tool_result_parts_guard_skips_extraction() {
1141            // FIX-1 regression: has_tool_result_parts=true → extraction must be skipped.
1142            // Tool result messages contain raw structured output (TOML, JSON, code) — not
1143            // conversational content. Extracting entities from them produces graph noise.
1144            let provider = mock_provider(vec![]);
1145            let mut agent = agent_with_graph(&provider, enabled_graph_config()).await;
1146            let pool = agent
1147                .memory_state
1148                .memory
1149                .as_ref()
1150                .unwrap()
1151                .sqlite()
1152                .pool()
1153                .clone();
1154
1155            agent
1156                .maybe_spawn_graph_extraction(
1157                    "[tool_result: abc123]\nprovider_type = \"claude\"\nallowed_commands = []",
1158                    false,
1159                    true, // has_tool_result_parts
1160                )
1161                .await;
1162
1163            tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1164
1165            let store = GraphStore::new(pool);
1166            let count = store.get_metadata("extraction_count").await.unwrap();
1167            assert!(
1168                count.is_none(),
1169                "tool result message must not trigger graph extraction"
1170            );
1171        }
1172
1173        #[tokio::test]
1174        async fn context_filter_excludes_tool_result_messages() {
1175            // FIX-2: context_messages must not include tool result user messages.
1176            // When maybe_spawn_graph_extraction collects context, it filters out
1177            // Role::User messages that contain ToolResult parts — only conversational
1178            // user messages are included as extraction context.
1179            //
1180            // This test verifies the guard fires: a tool result message alone is passed
1181            // (has_tool_result_parts=true) → extraction is skipped entirely, so context
1182            // filtering is not exercised. We verify FIX-2 by ensuring a prior tool result
1183            // message in agent.msg.messages is excluded when a subsequent conversational message
1184            // triggers extraction.
1185            let provider = mock_provider(vec![]);
1186            let mut agent = agent_with_graph(&provider, enabled_graph_config()).await;
1187
1188            // Add a tool result message to the agent's message history — this simulates
1189            // a tool call response that arrived before the current conversational turn.
1190            agent.msg.messages.push(Message {
1191                role: Role::User,
1192                content: "[tool_result: abc]\nprovider_type = \"openai\"".to_owned(),
1193                parts: vec![MessagePart::ToolResult {
1194                    tool_use_id: "abc".to_owned(),
1195                    content: "provider_type = \"openai\"".to_owned(),
1196                    is_error: false,
1197                }],
1198                metadata: MessageMetadata::default(),
1199            });
1200
1201            let pool = agent
1202                .memory_state
1203                .memory
1204                .as_ref()
1205                .unwrap()
1206                .sqlite()
1207                .pool()
1208                .clone();
1209
1210            // Trigger extraction for a conversational message (not a tool result).
1211            agent
1212                .maybe_spawn_graph_extraction("I prefer Rust for systems programming", false, false)
1213                .await;
1214
1215            tokio::time::sleep(std::time::Duration::from_millis(200)).await;
1216
1217            // Extraction must have fired (conversational message, no injection flags).
1218            let store = GraphStore::new(pool);
1219            let count = store.get_metadata("extraction_count").await.unwrap();
1220            assert!(
1221                count.is_some(),
1222                "conversational message must trigger extraction even with prior tool result in history"
1223            );
1224        }
1225    }
1226
1227    #[tokio::test]
1228    async fn persist_message_user_always_embeds_regardless_of_autosave_flag() {
1229        let provider = mock_provider(vec![]);
1230        let channel = MockChannel::new(vec![]);
1231        let registry = create_test_registry();
1232        let executor = MockToolExecutor::no_tools();
1233
1234        let (tx, rx) = tokio::sync::watch::channel(MetricsSnapshot::default());
1235        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1236        let cid = memory.sqlite().create_conversation().await.unwrap();
1237
1238        // autosave_assistant=false — but User role always takes embedding path
1239        let mut agent = Agent::new(provider, channel, registry, None, 5, executor)
1240            .with_metrics(tx)
1241            .with_memory(std::sync::Arc::new(memory), cid, 50, 5, 100)
1242            .with_autosave_config(false, 20);
1243
1244        let long_user_msg = "A".repeat(100);
1245        agent
1246            .persist_message(Role::User, &long_user_msg, &[], false)
1247            .await;
1248
1249        let history = agent
1250            .memory_state
1251            .memory
1252            .as_ref()
1253            .unwrap()
1254            .sqlite()
1255            .load_history(cid, 50)
1256            .await
1257            .unwrap();
1258        assert_eq!(history.len(), 1, "user message must be saved");
1259        // User messages go through remember_with_parts (embedding path).
1260        // sqlite_message_count must increment regardless of Qdrant availability.
1261        assert_eq!(rx.borrow().sqlite_message_count, 1);
1262    }
1263
1264    // Round-trip tests: verify that persist_message saves the correct parts and they
1265    // are restored correctly by load_history.
1266
1267    #[tokio::test]
1268    async fn persist_message_saves_correct_tool_use_parts() {
1269        use zeph_llm::provider::MessagePart;
1270
1271        let provider = mock_provider(vec![]);
1272        let channel = MockChannel::new(vec![]);
1273        let registry = create_test_registry();
1274        let executor = MockToolExecutor::no_tools();
1275
1276        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1277        let cid = memory.sqlite().create_conversation().await.unwrap();
1278
1279        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
1280            std::sync::Arc::new(memory),
1281            cid,
1282            50,
1283            5,
1284            100,
1285        );
1286
1287        let parts = vec![MessagePart::ToolUse {
1288            id: "call_abc123".to_string(),
1289            name: "read_file".to_string(),
1290            input: serde_json::json!({"path": "/tmp/test.txt"}),
1291        }];
1292        let content = "[tool_use: read_file(call_abc123)]";
1293
1294        agent
1295            .persist_message(Role::Assistant, content, &parts, false)
1296            .await;
1297
1298        let history = agent
1299            .memory_state
1300            .memory
1301            .as_ref()
1302            .unwrap()
1303            .sqlite()
1304            .load_history(cid, 50)
1305            .await
1306            .unwrap();
1307
1308        assert_eq!(history.len(), 1);
1309        assert_eq!(history[0].role, Role::Assistant);
1310        assert_eq!(history[0].content, content);
1311        assert_eq!(history[0].parts.len(), 1);
1312        match &history[0].parts[0] {
1313            MessagePart::ToolUse { id, name, .. } => {
1314                assert_eq!(id, "call_abc123");
1315                assert_eq!(name, "read_file");
1316            }
1317            other => panic!("expected ToolUse part, got {other:?}"),
1318        }
1319        // Regression guard: assistant message must NOT have ToolResult parts
1320        assert!(
1321            !history[0]
1322                .parts
1323                .iter()
1324                .any(|p| matches!(p, MessagePart::ToolResult { .. })),
1325            "assistant message must not contain ToolResult parts"
1326        );
1327    }
1328
1329    #[tokio::test]
1330    async fn persist_message_saves_correct_tool_result_parts() {
1331        use zeph_llm::provider::MessagePart;
1332
1333        let provider = mock_provider(vec![]);
1334        let channel = MockChannel::new(vec![]);
1335        let registry = create_test_registry();
1336        let executor = MockToolExecutor::no_tools();
1337
1338        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1339        let cid = memory.sqlite().create_conversation().await.unwrap();
1340
1341        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
1342            std::sync::Arc::new(memory),
1343            cid,
1344            50,
1345            5,
1346            100,
1347        );
1348
1349        let parts = vec![MessagePart::ToolResult {
1350            tool_use_id: "call_abc123".to_string(),
1351            content: "file contents here".to_string(),
1352            is_error: false,
1353        }];
1354        let content = "[tool_result: call_abc123]\nfile contents here";
1355
1356        agent
1357            .persist_message(Role::User, content, &parts, false)
1358            .await;
1359
1360        let history = agent
1361            .memory_state
1362            .memory
1363            .as_ref()
1364            .unwrap()
1365            .sqlite()
1366            .load_history(cid, 50)
1367            .await
1368            .unwrap();
1369
1370        assert_eq!(history.len(), 1);
1371        assert_eq!(history[0].role, Role::User);
1372        assert_eq!(history[0].content, content);
1373        assert_eq!(history[0].parts.len(), 1);
1374        match &history[0].parts[0] {
1375            MessagePart::ToolResult {
1376                tool_use_id,
1377                content: result_content,
1378                is_error,
1379            } => {
1380                assert_eq!(tool_use_id, "call_abc123");
1381                assert_eq!(result_content, "file contents here");
1382                assert!(!is_error);
1383            }
1384            other => panic!("expected ToolResult part, got {other:?}"),
1385        }
1386        // Regression guard: user message with ToolResult must NOT have ToolUse parts
1387        assert!(
1388            !history[0]
1389                .parts
1390                .iter()
1391                .any(|p| matches!(p, MessagePart::ToolUse { .. })),
1392            "user ToolResult message must not contain ToolUse parts"
1393        );
1394    }
1395
1396    #[tokio::test]
1397    async fn persist_message_roundtrip_preserves_role_part_alignment() {
1398        use zeph_llm::provider::MessagePart;
1399
1400        let provider = mock_provider(vec![]);
1401        let channel = MockChannel::new(vec![]);
1402        let registry = create_test_registry();
1403        let executor = MockToolExecutor::no_tools();
1404
1405        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1406        let cid = memory.sqlite().create_conversation().await.unwrap();
1407
1408        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
1409            std::sync::Arc::new(memory),
1410            cid,
1411            50,
1412            5,
1413            100,
1414        );
1415
1416        // Persist assistant message with ToolUse parts
1417        let assistant_parts = vec![MessagePart::ToolUse {
1418            id: "id_1".to_string(),
1419            name: "list_dir".to_string(),
1420            input: serde_json::json!({"path": "/tmp"}),
1421        }];
1422        agent
1423            .persist_message(
1424                Role::Assistant,
1425                "[tool_use: list_dir(id_1)]",
1426                &assistant_parts,
1427                false,
1428            )
1429            .await;
1430
1431        // Persist user message with ToolResult parts
1432        let user_parts = vec![MessagePart::ToolResult {
1433            tool_use_id: "id_1".to_string(),
1434            content: "file1.txt\nfile2.txt".to_string(),
1435            is_error: false,
1436        }];
1437        agent
1438            .persist_message(
1439                Role::User,
1440                "[tool_result: id_1]\nfile1.txt\nfile2.txt",
1441                &user_parts,
1442                false,
1443            )
1444            .await;
1445
1446        let history = agent
1447            .memory_state
1448            .memory
1449            .as_ref()
1450            .unwrap()
1451            .sqlite()
1452            .load_history(cid, 50)
1453            .await
1454            .unwrap();
1455
1456        assert_eq!(history.len(), 2);
1457
1458        // First message: assistant + ToolUse
1459        assert_eq!(history[0].role, Role::Assistant);
1460        assert_eq!(history[0].content, "[tool_use: list_dir(id_1)]");
1461        assert!(
1462            matches!(&history[0].parts[0], MessagePart::ToolUse { id, .. } if id == "id_1"),
1463            "first message must be assistant ToolUse"
1464        );
1465
1466        // Second message: user + ToolResult
1467        assert_eq!(history[1].role, Role::User);
1468        assert_eq!(
1469            history[1].content,
1470            "[tool_result: id_1]\nfile1.txt\nfile2.txt"
1471        );
1472        assert!(
1473            matches!(&history[1].parts[0], MessagePart::ToolResult { tool_use_id, .. } if tool_use_id == "id_1"),
1474            "second message must be user ToolResult"
1475        );
1476
1477        // Cross-role regression guard: no swapped parts
1478        assert!(
1479            !history[0]
1480                .parts
1481                .iter()
1482                .any(|p| matches!(p, MessagePart::ToolResult { .. })),
1483            "assistant message must not have ToolResult parts"
1484        );
1485        assert!(
1486            !history[1]
1487                .parts
1488                .iter()
1489                .any(|p| matches!(p, MessagePart::ToolUse { .. })),
1490            "user message must not have ToolUse parts"
1491        );
1492    }
1493
1494    #[tokio::test]
1495    async fn persist_message_saves_correct_tool_output_parts() {
1496        use zeph_llm::provider::MessagePart;
1497
1498        let provider = mock_provider(vec![]);
1499        let channel = MockChannel::new(vec![]);
1500        let registry = create_test_registry();
1501        let executor = MockToolExecutor::no_tools();
1502
1503        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1504        let cid = memory.sqlite().create_conversation().await.unwrap();
1505
1506        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
1507            std::sync::Arc::new(memory),
1508            cid,
1509            50,
1510            5,
1511            100,
1512        );
1513
1514        let parts = vec![MessagePart::ToolOutput {
1515            tool_name: "shell".to_string(),
1516            body: "hello from shell".to_string(),
1517            compacted_at: None,
1518        }];
1519        let content = "[tool: shell]\nhello from shell";
1520
1521        agent
1522            .persist_message(Role::User, content, &parts, false)
1523            .await;
1524
1525        let history = agent
1526            .memory_state
1527            .memory
1528            .as_ref()
1529            .unwrap()
1530            .sqlite()
1531            .load_history(cid, 50)
1532            .await
1533            .unwrap();
1534
1535        assert_eq!(history.len(), 1);
1536        assert_eq!(history[0].role, Role::User);
1537        assert_eq!(history[0].content, content);
1538        assert_eq!(history[0].parts.len(), 1);
1539        match &history[0].parts[0] {
1540            MessagePart::ToolOutput {
1541                tool_name,
1542                body,
1543                compacted_at,
1544            } => {
1545                assert_eq!(tool_name, "shell");
1546                assert_eq!(body, "hello from shell");
1547                assert!(compacted_at.is_none());
1548            }
1549            other => panic!("expected ToolOutput part, got {other:?}"),
1550        }
1551    }
1552
1553    // --- sanitize_tool_pairs unit tests ---
1554
1555    #[tokio::test]
1556    async fn load_history_removes_trailing_orphan_tool_use() {
1557        use zeph_llm::provider::MessagePart;
1558
1559        let provider = mock_provider(vec![]);
1560        let channel = MockChannel::new(vec![]);
1561        let registry = create_test_registry();
1562        let executor = MockToolExecutor::no_tools();
1563
1564        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1565        let cid = memory.sqlite().create_conversation().await.unwrap();
1566        let sqlite = memory.sqlite();
1567
1568        // user message (normal)
1569        sqlite
1570            .save_message(cid, "user", "do something with a tool")
1571            .await
1572            .unwrap();
1573
1574        // assistant message with ToolUse parts — no following tool_result (orphan)
1575        let parts = serde_json::to_string(&[MessagePart::ToolUse {
1576            id: "call_orphan".to_string(),
1577            name: "shell".to_string(),
1578            input: serde_json::json!({"command": "ls"}),
1579        }])
1580        .unwrap();
1581        sqlite
1582            .save_message_with_parts(cid, "assistant", "[tool_use: shell(call_orphan)]", &parts)
1583            .await
1584            .unwrap();
1585
1586        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
1587            std::sync::Arc::new(memory),
1588            cid,
1589            50,
1590            5,
1591            100,
1592        );
1593
1594        let messages_before = agent.msg.messages.len();
1595        agent.load_history().await.unwrap();
1596
1597        // Only the user message should be loaded; orphaned assistant tool_use removed.
1598        assert_eq!(
1599            agent.msg.messages.len(),
1600            messages_before + 1,
1601            "orphaned trailing tool_use must be removed"
1602        );
1603        assert_eq!(agent.msg.messages.last().unwrap().role, Role::User);
1604    }
1605
1606    #[tokio::test]
1607    async fn load_history_removes_leading_orphan_tool_result() {
1608        use zeph_llm::provider::MessagePart;
1609
1610        let provider = mock_provider(vec![]);
1611        let channel = MockChannel::new(vec![]);
1612        let registry = create_test_registry();
1613        let executor = MockToolExecutor::no_tools();
1614
1615        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1616        let cid = memory.sqlite().create_conversation().await.unwrap();
1617        let sqlite = memory.sqlite();
1618
1619        // Leading orphan: user message with ToolResult but no preceding tool_use
1620        let result_parts = serde_json::to_string(&[MessagePart::ToolResult {
1621            tool_use_id: "call_missing".to_string(),
1622            content: "result data".to_string(),
1623            is_error: false,
1624        }])
1625        .unwrap();
1626        sqlite
1627            .save_message_with_parts(
1628                cid,
1629                "user",
1630                "[tool_result: call_missing]\nresult data",
1631                &result_parts,
1632            )
1633            .await
1634            .unwrap();
1635
1636        // A valid assistant reply after the orphan
1637        sqlite
1638            .save_message(cid, "assistant", "here is my response")
1639            .await
1640            .unwrap();
1641
1642        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
1643            std::sync::Arc::new(memory),
1644            cid,
1645            50,
1646            5,
1647            100,
1648        );
1649
1650        let messages_before = agent.msg.messages.len();
1651        agent.load_history().await.unwrap();
1652
1653        // Orphaned leading tool_result removed; only assistant message kept.
1654        assert_eq!(
1655            agent.msg.messages.len(),
1656            messages_before + 1,
1657            "orphaned leading tool_result must be removed"
1658        );
1659        assert_eq!(agent.msg.messages.last().unwrap().role, Role::Assistant);
1660    }
1661
1662    #[tokio::test]
1663    async fn load_history_preserves_complete_tool_pairs() {
1664        use zeph_llm::provider::MessagePart;
1665
1666        let provider = mock_provider(vec![]);
1667        let channel = MockChannel::new(vec![]);
1668        let registry = create_test_registry();
1669        let executor = MockToolExecutor::no_tools();
1670
1671        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1672        let cid = memory.sqlite().create_conversation().await.unwrap();
1673        let sqlite = memory.sqlite();
1674
1675        // Complete tool_use / tool_result pair
1676        let use_parts = serde_json::to_string(&[MessagePart::ToolUse {
1677            id: "call_ok".to_string(),
1678            name: "shell".to_string(),
1679            input: serde_json::json!({"command": "pwd"}),
1680        }])
1681        .unwrap();
1682        sqlite
1683            .save_message_with_parts(cid, "assistant", "[tool_use: shell(call_ok)]", &use_parts)
1684            .await
1685            .unwrap();
1686
1687        let result_parts = serde_json::to_string(&[MessagePart::ToolResult {
1688            tool_use_id: "call_ok".to_string(),
1689            content: "/home/user".to_string(),
1690            is_error: false,
1691        }])
1692        .unwrap();
1693        sqlite
1694            .save_message_with_parts(
1695                cid,
1696                "user",
1697                "[tool_result: call_ok]\n/home/user",
1698                &result_parts,
1699            )
1700            .await
1701            .unwrap();
1702
1703        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
1704            std::sync::Arc::new(memory),
1705            cid,
1706            50,
1707            5,
1708            100,
1709        );
1710
1711        let messages_before = agent.msg.messages.len();
1712        agent.load_history().await.unwrap();
1713
1714        // Both messages must be preserved.
1715        assert_eq!(
1716            agent.msg.messages.len(),
1717            messages_before + 2,
1718            "complete tool_use/tool_result pair must be preserved"
1719        );
1720        assert_eq!(agent.msg.messages[messages_before].role, Role::Assistant);
1721        assert_eq!(agent.msg.messages[messages_before + 1].role, Role::User);
1722    }
1723
1724    #[tokio::test]
1725    async fn load_history_handles_multiple_trailing_orphans() {
1726        use zeph_llm::provider::MessagePart;
1727
1728        let provider = mock_provider(vec![]);
1729        let channel = MockChannel::new(vec![]);
1730        let registry = create_test_registry();
1731        let executor = MockToolExecutor::no_tools();
1732
1733        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1734        let cid = memory.sqlite().create_conversation().await.unwrap();
1735        let sqlite = memory.sqlite();
1736
1737        // Normal user message
1738        sqlite.save_message(cid, "user", "start").await.unwrap();
1739
1740        // First orphaned tool_use
1741        let parts1 = serde_json::to_string(&[MessagePart::ToolUse {
1742            id: "call_1".to_string(),
1743            name: "shell".to_string(),
1744            input: serde_json::json!({}),
1745        }])
1746        .unwrap();
1747        sqlite
1748            .save_message_with_parts(cid, "assistant", "[tool_use: shell(call_1)]", &parts1)
1749            .await
1750            .unwrap();
1751
1752        // Second orphaned tool_use (consecutive, no tool_result between them)
1753        let parts2 = serde_json::to_string(&[MessagePart::ToolUse {
1754            id: "call_2".to_string(),
1755            name: "read_file".to_string(),
1756            input: serde_json::json!({}),
1757        }])
1758        .unwrap();
1759        sqlite
1760            .save_message_with_parts(cid, "assistant", "[tool_use: read_file(call_2)]", &parts2)
1761            .await
1762            .unwrap();
1763
1764        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
1765            std::sync::Arc::new(memory),
1766            cid,
1767            50,
1768            5,
1769            100,
1770        );
1771
1772        let messages_before = agent.msg.messages.len();
1773        agent.load_history().await.unwrap();
1774
1775        // Both orphaned tool_use messages removed; only the user message kept.
1776        assert_eq!(
1777            agent.msg.messages.len(),
1778            messages_before + 1,
1779            "all trailing orphaned tool_use messages must be removed"
1780        );
1781        assert_eq!(agent.msg.messages.last().unwrap().role, Role::User);
1782    }
1783
1784    #[tokio::test]
1785    async fn load_history_no_tool_messages_unchanged() {
1786        let provider = mock_provider(vec![]);
1787        let channel = MockChannel::new(vec![]);
1788        let registry = create_test_registry();
1789        let executor = MockToolExecutor::no_tools();
1790
1791        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1792        let cid = memory.sqlite().create_conversation().await.unwrap();
1793        let sqlite = memory.sqlite();
1794
1795        sqlite.save_message(cid, "user", "hello").await.unwrap();
1796        sqlite
1797            .save_message(cid, "assistant", "hi there")
1798            .await
1799            .unwrap();
1800        sqlite
1801            .save_message(cid, "user", "how are you?")
1802            .await
1803            .unwrap();
1804
1805        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
1806            std::sync::Arc::new(memory),
1807            cid,
1808            50,
1809            5,
1810            100,
1811        );
1812
1813        let messages_before = agent.msg.messages.len();
1814        agent.load_history().await.unwrap();
1815
1816        // All three plain messages must be preserved.
1817        assert_eq!(
1818            agent.msg.messages.len(),
1819            messages_before + 3,
1820            "plain messages without tool parts must pass through unchanged"
1821        );
1822    }
1823
1824    #[tokio::test]
1825    async fn load_history_removes_both_leading_and_trailing_orphans() {
1826        use zeph_llm::provider::MessagePart;
1827
1828        let provider = mock_provider(vec![]);
1829        let channel = MockChannel::new(vec![]);
1830        let registry = create_test_registry();
1831        let executor = MockToolExecutor::no_tools();
1832
1833        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1834        let cid = memory.sqlite().create_conversation().await.unwrap();
1835        let sqlite = memory.sqlite();
1836
1837        // Leading orphan: user message with ToolResult, no preceding tool_use
1838        let result_parts = serde_json::to_string(&[MessagePart::ToolResult {
1839            tool_use_id: "call_leading".to_string(),
1840            content: "orphaned result".to_string(),
1841            is_error: false,
1842        }])
1843        .unwrap();
1844        sqlite
1845            .save_message_with_parts(
1846                cid,
1847                "user",
1848                "[tool_result: call_leading]\norphaned result",
1849                &result_parts,
1850            )
1851            .await
1852            .unwrap();
1853
1854        // Valid middle messages
1855        sqlite
1856            .save_message(cid, "user", "what is 2+2?")
1857            .await
1858            .unwrap();
1859        sqlite.save_message(cid, "assistant", "4").await.unwrap();
1860
1861        // Trailing orphan: assistant message with ToolUse, no following tool_result
1862        let use_parts = serde_json::to_string(&[MessagePart::ToolUse {
1863            id: "call_trailing".to_string(),
1864            name: "shell".to_string(),
1865            input: serde_json::json!({"command": "date"}),
1866        }])
1867        .unwrap();
1868        sqlite
1869            .save_message_with_parts(
1870                cid,
1871                "assistant",
1872                "[tool_use: shell(call_trailing)]",
1873                &use_parts,
1874            )
1875            .await
1876            .unwrap();
1877
1878        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
1879            std::sync::Arc::new(memory),
1880            cid,
1881            50,
1882            5,
1883            100,
1884        );
1885
1886        let messages_before = agent.msg.messages.len();
1887        agent.load_history().await.unwrap();
1888
1889        // Both orphans removed; only the 2 valid middle messages kept.
1890        assert_eq!(
1891            agent.msg.messages.len(),
1892            messages_before + 2,
1893            "both leading and trailing orphans must be removed"
1894        );
1895        assert_eq!(agent.msg.messages[messages_before].role, Role::User);
1896        assert_eq!(agent.msg.messages[messages_before].content, "what is 2+2?");
1897        assert_eq!(
1898            agent.msg.messages[messages_before + 1].role,
1899            Role::Assistant
1900        );
1901        assert_eq!(agent.msg.messages[messages_before + 1].content, "4");
1902    }
1903
1904    /// RC1 regression: mid-history assistant[`ToolUse`] without a following user[`ToolResult`]
1905    /// must have its `ToolUse` parts stripped (text preserved). The message count stays the same
1906    /// because the assistant message has a text content fallback; only `ToolUse` parts are
1907    /// removed.
1908    #[tokio::test]
1909    async fn sanitize_tool_pairs_strips_mid_history_orphan_tool_use() {
1910        use zeph_llm::provider::MessagePart;
1911
1912        let provider = mock_provider(vec![]);
1913        let channel = MockChannel::new(vec![]);
1914        let registry = create_test_registry();
1915        let executor = MockToolExecutor::no_tools();
1916
1917        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1918        let cid = memory.sqlite().create_conversation().await.unwrap();
1919        let sqlite = memory.sqlite();
1920
1921        // Normal first exchange.
1922        sqlite
1923            .save_message(cid, "user", "first question")
1924            .await
1925            .unwrap();
1926        sqlite
1927            .save_message(cid, "assistant", "first answer")
1928            .await
1929            .unwrap();
1930
1931        // Mid-history orphan: assistant with ToolUse but NO following ToolResult user message.
1932        // This models the compaction-split scenario (RC2) where replace_conversation hid the
1933        // user[ToolResult] but left the assistant[ToolUse] visible.
1934        let use_parts = serde_json::to_string(&[
1935            MessagePart::ToolUse {
1936                id: "call_mid_1".to_string(),
1937                name: "shell".to_string(),
1938                input: serde_json::json!({"command": "ls"}),
1939            },
1940            MessagePart::Text {
1941                text: "Let me check the files.".to_string(),
1942            },
1943        ])
1944        .unwrap();
1945        sqlite
1946            .save_message_with_parts(cid, "assistant", "Let me check the files.", &use_parts)
1947            .await
1948            .unwrap();
1949
1950        // Another normal exchange after the orphan.
1951        sqlite
1952            .save_message(cid, "user", "second question")
1953            .await
1954            .unwrap();
1955        sqlite
1956            .save_message(cid, "assistant", "second answer")
1957            .await
1958            .unwrap();
1959
1960        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
1961            std::sync::Arc::new(memory),
1962            cid,
1963            50,
1964            5,
1965            100,
1966        );
1967
1968        let messages_before = agent.msg.messages.len();
1969        agent.load_history().await.unwrap();
1970
1971        // All 5 messages remain (orphan message kept because it has text), but the orphaned
1972        // message must have its ToolUse parts stripped.
1973        assert_eq!(
1974            agent.msg.messages.len(),
1975            messages_before + 5,
1976            "message count must be 5 (orphan message kept — has text content)"
1977        );
1978
1979        // The orphaned assistant message (index 2 in the loaded slice) must have no ToolUse parts.
1980        let orphan = &agent.msg.messages[messages_before + 2];
1981        assert_eq!(orphan.role, Role::Assistant);
1982        assert!(
1983            !orphan
1984                .parts
1985                .iter()
1986                .any(|p| matches!(p, MessagePart::ToolUse { .. })),
1987            "orphaned ToolUse parts must be stripped from mid-history message"
1988        );
1989        // Text part must be preserved.
1990        assert!(
1991            orphan.parts.iter().any(
1992                |p| matches!(p, MessagePart::Text { text } if text == "Let me check the files.")
1993            ),
1994            "text content of orphaned assistant message must be preserved"
1995        );
1996    }
1997
1998    /// RC3 regression: a user message with empty `content` but non-empty `parts` (`ToolResult`)
1999    /// must NOT be skipped by `load_history`. Previously the empty-content check dropped these
2000    /// messages before `sanitize_tool_pairs` ran, leaving the preceding assistant `ToolUse`
2001    /// orphaned.
2002    #[tokio::test]
2003    async fn load_history_keeps_tool_only_user_message() {
2004        use zeph_llm::provider::MessagePart;
2005
2006        let provider = mock_provider(vec![]);
2007        let channel = MockChannel::new(vec![]);
2008        let registry = create_test_registry();
2009        let executor = MockToolExecutor::no_tools();
2010
2011        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2012        let cid = memory.sqlite().create_conversation().await.unwrap();
2013        let sqlite = memory.sqlite();
2014
2015        // Assistant sends a ToolUse.
2016        let use_parts = serde_json::to_string(&[MessagePart::ToolUse {
2017            id: "call_rc3".to_string(),
2018            name: "memory_save".to_string(),
2019            input: serde_json::json!({"content": "something"}),
2020        }])
2021        .unwrap();
2022        sqlite
2023            .save_message_with_parts(cid, "assistant", "[tool_use: memory_save]", &use_parts)
2024            .await
2025            .unwrap();
2026
2027        // User message has empty text content but carries ToolResult in parts — native tool pattern.
2028        let result_parts = serde_json::to_string(&[MessagePart::ToolResult {
2029            tool_use_id: "call_rc3".to_string(),
2030            content: "saved".to_string(),
2031            is_error: false,
2032        }])
2033        .unwrap();
2034        sqlite
2035            .save_message_with_parts(cid, "user", "", &result_parts)
2036            .await
2037            .unwrap();
2038
2039        sqlite.save_message(cid, "assistant", "done").await.unwrap();
2040
2041        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2042            std::sync::Arc::new(memory),
2043            cid,
2044            50,
2045            5,
2046            100,
2047        );
2048
2049        let messages_before = agent.msg.messages.len();
2050        agent.load_history().await.unwrap();
2051
2052        // All 3 messages must be loaded — the empty-content ToolResult user message must NOT be
2053        // dropped.
2054        assert_eq!(
2055            agent.msg.messages.len(),
2056            messages_before + 3,
2057            "user message with empty content but ToolResult parts must not be dropped"
2058        );
2059
2060        // The user message at index 1 must still carry the ToolResult part.
2061        let user_msg = &agent.msg.messages[messages_before + 1];
2062        assert_eq!(user_msg.role, Role::User);
2063        assert!(
2064            user_msg.parts.iter().any(
2065                |p| matches!(p, MessagePart::ToolResult { tool_use_id, .. } if tool_use_id == "call_rc3")
2066            ),
2067            "ToolResult part must be preserved on user message with empty content"
2068        );
2069    }
2070
2071    /// RC2 reverse pass: a user message with a `ToolResult` whose `tool_use_id` has no matching
2072    /// `ToolUse` in the preceding assistant message must be stripped by
2073    /// `strip_mid_history_orphans`.
2074    #[tokio::test]
2075    async fn strip_orphans_removes_orphaned_tool_result() {
2076        use zeph_llm::provider::MessagePart;
2077
2078        let provider = mock_provider(vec![]);
2079        let channel = MockChannel::new(vec![]);
2080        let registry = create_test_registry();
2081        let executor = MockToolExecutor::no_tools();
2082
2083        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2084        let cid = memory.sqlite().create_conversation().await.unwrap();
2085        let sqlite = memory.sqlite();
2086
2087        // Normal exchange before the orphan.
2088        sqlite.save_message(cid, "user", "hello").await.unwrap();
2089        sqlite.save_message(cid, "assistant", "hi").await.unwrap();
2090
2091        // Assistant message that does NOT contain a ToolUse.
2092        sqlite
2093            .save_message(cid, "assistant", "plain answer")
2094            .await
2095            .unwrap();
2096
2097        // User message references a tool_use_id that was never sent by the preceding assistant.
2098        let orphan_result_parts = serde_json::to_string(&[MessagePart::ToolResult {
2099            tool_use_id: "call_nonexistent".to_string(),
2100            content: "stale result".to_string(),
2101            is_error: false,
2102        }])
2103        .unwrap();
2104        sqlite
2105            .save_message_with_parts(
2106                cid,
2107                "user",
2108                "[tool_result: call_nonexistent]\nstale result",
2109                &orphan_result_parts,
2110            )
2111            .await
2112            .unwrap();
2113
2114        sqlite
2115            .save_message(cid, "assistant", "final")
2116            .await
2117            .unwrap();
2118
2119        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2120            std::sync::Arc::new(memory),
2121            cid,
2122            50,
2123            5,
2124            100,
2125        );
2126
2127        let messages_before = agent.msg.messages.len();
2128        agent.load_history().await.unwrap();
2129
2130        // The orphaned ToolResult part must have been stripped from the user message.
2131        // The user message itself may be removed (parts empty + content non-empty) or kept with
2132        // the text content — but it must NOT retain the orphaned ToolResult part.
2133        let loaded = &agent.msg.messages[messages_before..];
2134        for msg in loaded {
2135            assert!(
2136                !msg.parts.iter().any(|p| matches!(
2137                    p,
2138                    MessagePart::ToolResult { tool_use_id, .. } if tool_use_id == "call_nonexistent"
2139                )),
2140                "orphaned ToolResult part must be stripped from history"
2141            );
2142        }
2143    }
2144
2145    /// RC2 reverse pass: a complete `tool_use` + `tool_result` pair must pass through the reverse
2146    /// orphan check intact; the fix must not strip valid `ToolResult` parts.
2147    #[tokio::test]
2148    async fn strip_orphans_keeps_complete_pair() {
2149        use zeph_llm::provider::MessagePart;
2150
2151        let provider = mock_provider(vec![]);
2152        let channel = MockChannel::new(vec![]);
2153        let registry = create_test_registry();
2154        let executor = MockToolExecutor::no_tools();
2155
2156        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2157        let cid = memory.sqlite().create_conversation().await.unwrap();
2158        let sqlite = memory.sqlite();
2159
2160        let use_parts = serde_json::to_string(&[MessagePart::ToolUse {
2161            id: "call_valid".to_string(),
2162            name: "shell".to_string(),
2163            input: serde_json::json!({"command": "ls"}),
2164        }])
2165        .unwrap();
2166        sqlite
2167            .save_message_with_parts(cid, "assistant", "[tool_use: shell]", &use_parts)
2168            .await
2169            .unwrap();
2170
2171        let result_parts = serde_json::to_string(&[MessagePart::ToolResult {
2172            tool_use_id: "call_valid".to_string(),
2173            content: "file.rs".to_string(),
2174            is_error: false,
2175        }])
2176        .unwrap();
2177        sqlite
2178            .save_message_with_parts(cid, "user", "", &result_parts)
2179            .await
2180            .unwrap();
2181
2182        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2183            std::sync::Arc::new(memory),
2184            cid,
2185            50,
2186            5,
2187            100,
2188        );
2189
2190        let messages_before = agent.msg.messages.len();
2191        agent.load_history().await.unwrap();
2192
2193        assert_eq!(
2194            agent.msg.messages.len(),
2195            messages_before + 2,
2196            "complete tool_use/tool_result pair must be preserved"
2197        );
2198
2199        let user_msg = &agent.msg.messages[messages_before + 1];
2200        assert!(
2201            user_msg.parts.iter().any(|p| matches!(
2202                p,
2203                MessagePart::ToolResult { tool_use_id, .. } if tool_use_id == "call_valid"
2204            )),
2205            "ToolResult part for a matched tool_use must not be stripped"
2206        );
2207    }
2208
2209    /// RC2 reverse pass: history with a mix of complete pairs and orphaned `ToolResult` messages.
2210    /// Orphaned `ToolResult` parts must be stripped; complete pairs must pass through intact.
2211    #[tokio::test]
2212    async fn strip_orphans_mixed_history() {
2213        use zeph_llm::provider::MessagePart;
2214
2215        let provider = mock_provider(vec![]);
2216        let channel = MockChannel::new(vec![]);
2217        let registry = create_test_registry();
2218        let executor = MockToolExecutor::no_tools();
2219
2220        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2221        let cid = memory.sqlite().create_conversation().await.unwrap();
2222        let sqlite = memory.sqlite();
2223
2224        // First: complete tool_use / tool_result pair.
2225        let use_parts_ok = serde_json::to_string(&[MessagePart::ToolUse {
2226            id: "call_good".to_string(),
2227            name: "shell".to_string(),
2228            input: serde_json::json!({"command": "pwd"}),
2229        }])
2230        .unwrap();
2231        sqlite
2232            .save_message_with_parts(cid, "assistant", "[tool_use: shell]", &use_parts_ok)
2233            .await
2234            .unwrap();
2235
2236        let result_parts_ok = serde_json::to_string(&[MessagePart::ToolResult {
2237            tool_use_id: "call_good".to_string(),
2238            content: "/home".to_string(),
2239            is_error: false,
2240        }])
2241        .unwrap();
2242        sqlite
2243            .save_message_with_parts(cid, "user", "", &result_parts_ok)
2244            .await
2245            .unwrap();
2246
2247        // Second: plain assistant message followed by an orphaned ToolResult user message.
2248        sqlite
2249            .save_message(cid, "assistant", "text only")
2250            .await
2251            .unwrap();
2252
2253        let orphan_parts = serde_json::to_string(&[MessagePart::ToolResult {
2254            tool_use_id: "call_ghost".to_string(),
2255            content: "ghost result".to_string(),
2256            is_error: false,
2257        }])
2258        .unwrap();
2259        sqlite
2260            .save_message_with_parts(
2261                cid,
2262                "user",
2263                "[tool_result: call_ghost]\nghost result",
2264                &orphan_parts,
2265            )
2266            .await
2267            .unwrap();
2268
2269        sqlite
2270            .save_message(cid, "assistant", "final reply")
2271            .await
2272            .unwrap();
2273
2274        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2275            std::sync::Arc::new(memory),
2276            cid,
2277            50,
2278            5,
2279            100,
2280        );
2281
2282        let messages_before = agent.msg.messages.len();
2283        agent.load_history().await.unwrap();
2284
2285        let loaded = &agent.msg.messages[messages_before..];
2286
2287        // The orphaned ToolResult part must not appear in any message.
2288        for msg in loaded {
2289            assert!(
2290                !msg.parts.iter().any(|p| matches!(
2291                    p,
2292                    MessagePart::ToolResult { tool_use_id, .. } if tool_use_id == "call_ghost"
2293                )),
2294                "orphaned ToolResult (call_ghost) must be stripped from history"
2295            );
2296        }
2297
2298        // The matched ToolResult must still be present on the user message following the
2299        // first assistant message.
2300        let has_good_result = loaded.iter().any(|msg| {
2301            msg.role == Role::User
2302                && msg.parts.iter().any(|p| {
2303                    matches!(
2304                        p,
2305                        MessagePart::ToolResult { tool_use_id, .. } if tool_use_id == "call_good"
2306                    )
2307                })
2308        });
2309        assert!(
2310            has_good_result,
2311            "matched ToolResult (call_good) must be preserved in history"
2312        );
2313    }
2314
2315    /// Regression: a properly matched `tool_use`/`tool_result` pair must NOT be touched by the
2316    /// mid-history scan — ensures the fix doesn't break valid tool exchanges.
2317    #[tokio::test]
2318    async fn sanitize_tool_pairs_preserves_matched_tool_pair() {
2319        use zeph_llm::provider::MessagePart;
2320
2321        let provider = mock_provider(vec![]);
2322        let channel = MockChannel::new(vec![]);
2323        let registry = create_test_registry();
2324        let executor = MockToolExecutor::no_tools();
2325
2326        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2327        let cid = memory.sqlite().create_conversation().await.unwrap();
2328        let sqlite = memory.sqlite();
2329
2330        sqlite
2331            .save_message(cid, "user", "run a command")
2332            .await
2333            .unwrap();
2334
2335        // Assistant sends a ToolUse.
2336        let use_parts = serde_json::to_string(&[MessagePart::ToolUse {
2337            id: "call_ok".to_string(),
2338            name: "shell".to_string(),
2339            input: serde_json::json!({"command": "echo hi"}),
2340        }])
2341        .unwrap();
2342        sqlite
2343            .save_message_with_parts(cid, "assistant", "[tool_use: shell]", &use_parts)
2344            .await
2345            .unwrap();
2346
2347        // Matching user ToolResult follows.
2348        let result_parts = serde_json::to_string(&[MessagePart::ToolResult {
2349            tool_use_id: "call_ok".to_string(),
2350            content: "hi".to_string(),
2351            is_error: false,
2352        }])
2353        .unwrap();
2354        sqlite
2355            .save_message_with_parts(cid, "user", "[tool_result: call_ok]\nhi", &result_parts)
2356            .await
2357            .unwrap();
2358
2359        sqlite.save_message(cid, "assistant", "done").await.unwrap();
2360
2361        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2362            std::sync::Arc::new(memory),
2363            cid,
2364            50,
2365            5,
2366            100,
2367        );
2368
2369        let messages_before = agent.msg.messages.len();
2370        agent.load_history().await.unwrap();
2371
2372        // All 4 messages preserved, tool_use parts intact.
2373        assert_eq!(
2374            agent.msg.messages.len(),
2375            messages_before + 4,
2376            "matched tool pair must not be removed"
2377        );
2378        let tool_msg = &agent.msg.messages[messages_before + 1];
2379        assert!(
2380            tool_msg
2381                .parts
2382                .iter()
2383                .any(|p| matches!(p, MessagePart::ToolUse { id, .. } if id == "call_ok")),
2384            "matched ToolUse parts must be preserved"
2385        );
2386    }
2387
2388    /// RC5: `persist_cancelled_tool_results` must persist a tombstone user message containing
2389    /// `is_error=true` `ToolResult` parts for all `tool_calls` IDs so the preceding assistant
2390    /// `ToolUse` is never orphaned in the DB after a cancellation.
2391    #[tokio::test]
2392    async fn persist_cancelled_tool_results_pairs_tool_use() {
2393        use zeph_llm::provider::MessagePart;
2394
2395        let provider = mock_provider(vec![]);
2396        let channel = MockChannel::new(vec![]);
2397        let registry = create_test_registry();
2398        let executor = MockToolExecutor::no_tools();
2399
2400        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2401        let cid = memory.sqlite().create_conversation().await.unwrap();
2402
2403        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2404            std::sync::Arc::new(memory),
2405            cid,
2406            50,
2407            5,
2408            100,
2409        );
2410
2411        // Simulate: assistant message with two ToolUse parts already persisted.
2412        let tool_calls = vec![
2413            zeph_llm::provider::ToolUseRequest {
2414                id: "cancel_id_1".to_string(),
2415                name: "shell".to_string(),
2416                input: serde_json::json!({}),
2417            },
2418            zeph_llm::provider::ToolUseRequest {
2419                id: "cancel_id_2".to_string(),
2420                name: "read_file".to_string(),
2421                input: serde_json::json!({}),
2422            },
2423        ];
2424
2425        agent.persist_cancelled_tool_results(&tool_calls).await;
2426
2427        let history = agent
2428            .memory_state
2429            .memory
2430            .as_ref()
2431            .unwrap()
2432            .sqlite()
2433            .load_history(cid, 50)
2434            .await
2435            .unwrap();
2436
2437        // Exactly one user message must have been persisted.
2438        assert_eq!(history.len(), 1);
2439        assert_eq!(history[0].role, Role::User);
2440
2441        // It must contain is_error=true ToolResult for each tool call ID.
2442        for tc in &tool_calls {
2443            assert!(
2444                history[0].parts.iter().any(|p| matches!(
2445                    p,
2446                    MessagePart::ToolResult { tool_use_id, is_error, .. }
2447                        if tool_use_id == &tc.id && *is_error
2448                )),
2449                "tombstone ToolResult for {} must be present and is_error=true",
2450                tc.id
2451            );
2452        }
2453    }
2454}