Skip to main content

zeph_core/agent/
persistence.rs

1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4use std::collections::HashSet;
5
6use crate::channel::Channel;
7use zeph_llm::provider::{LlmProvider as _, Message, MessagePart, Role};
8use zeph_memory::store::role_str;
9
10use super::Agent;
11
12/// Remove orphaned `ToolUse`/`ToolResult` messages from restored history.
13///
14/// Four failure modes are handled:
15/// 1. Trailing orphan: the last message is an assistant with `ToolUse` parts but no subsequent
16///    user message with `ToolResult` — caused by LIMIT boundary splits or interrupted sessions.
17/// 2. Leading orphan: the first message is a user with `ToolResult` parts but no preceding
18///    assistant message with `ToolUse` — caused by LIMIT boundary cuts.
19/// 3. Mid-history orphaned `ToolUse`: an assistant message with `ToolUse` parts is not followed
20///    by a user message with matching `ToolResult` parts. The `ToolUse` parts are stripped;
21///    if no content remains the message is removed.
22/// 4. Mid-history orphaned `ToolResult`: a user message has `ToolResult` parts whose
23///    `tool_use_id` is not present in the preceding assistant message. Those `ToolResult` parts
24///    are stripped; if no content remains the message is removed.
25///
26/// Boundary cases are resolved in a loop before the mid-history scan runs.
27fn sanitize_tool_pairs(messages: &mut Vec<Message>) -> (usize, Vec<i64>) {
28    let mut removed = 0;
29    let mut db_ids: Vec<i64> = Vec::new();
30
31    loop {
32        // Remove trailing orphaned tool_use (assistant message with ToolUse, no following tool_result).
33        if let Some(last) = messages.last()
34            && last.role == Role::Assistant
35            && last
36                .parts
37                .iter()
38                .any(|p| matches!(p, MessagePart::ToolUse { .. }))
39        {
40            let ids: Vec<String> = last
41                .parts
42                .iter()
43                .filter_map(|p| {
44                    if let MessagePart::ToolUse { id, .. } = p {
45                        Some(id.clone())
46                    } else {
47                        None
48                    }
49                })
50                .collect();
51            tracing::warn!(
52                tool_ids = ?ids,
53                "removing orphaned trailing tool_use message from restored history"
54            );
55            if let Some(db_id) = messages.last().and_then(|m| m.metadata.db_id) {
56                db_ids.push(db_id);
57            }
58            messages.pop();
59            removed += 1;
60            continue;
61        }
62
63        // Remove leading orphaned tool_result (user message with ToolResult, no preceding tool_use).
64        if let Some(first) = messages.first()
65            && first.role == Role::User
66            && first
67                .parts
68                .iter()
69                .any(|p| matches!(p, MessagePart::ToolResult { .. }))
70        {
71            let ids: Vec<String> = first
72                .parts
73                .iter()
74                .filter_map(|p| {
75                    if let MessagePart::ToolResult { tool_use_id, .. } = p {
76                        Some(tool_use_id.clone())
77                    } else {
78                        None
79                    }
80                })
81                .collect();
82            tracing::warn!(
83                tool_use_ids = ?ids,
84                "removing orphaned leading tool_result message from restored history"
85            );
86            if let Some(db_id) = messages.first().and_then(|m| m.metadata.db_id) {
87                db_ids.push(db_id);
88            }
89            messages.remove(0);
90            removed += 1;
91            continue;
92        }
93
94        break;
95    }
96
97    // Mid-history scan: strip ToolUse parts from any assistant message whose tool IDs are not
98    // matched by ToolResult parts in the immediately following user message.
99    let (mid_removed, mid_db_ids) = strip_mid_history_orphans(messages);
100    removed += mid_removed;
101    db_ids.extend(mid_db_ids);
102
103    (removed, db_ids)
104}
105
106/// Collect `tool_use` IDs from `msg` that have no matching `ToolResult` in `next_msg`.
107fn orphaned_tool_use_ids(msg: &Message, next_msg: Option<&Message>) -> HashSet<String> {
108    let matched: HashSet<String> = next_msg
109        .filter(|n| n.role == Role::User)
110        .map(|n| {
111            msg.parts
112                .iter()
113                .filter_map(|p| if let MessagePart::ToolUse { id, .. } = p { Some(id.clone()) } else { None })
114                .filter(|uid| n.parts.iter().any(|np| matches!(np, MessagePart::ToolResult { tool_use_id, .. } if tool_use_id == uid)))
115                .collect()
116        })
117        .unwrap_or_default();
118    msg.parts
119        .iter()
120        .filter_map(|p| {
121            if let MessagePart::ToolUse { id, .. } = p
122                && !matched.contains(id)
123            {
124                Some(id.clone())
125            } else {
126                None
127            }
128        })
129        .collect()
130}
131
132/// Collect `tool_result` IDs from `msg` that have no matching `ToolUse` in `prev_msg`.
133fn orphaned_tool_result_ids(msg: &Message, prev_msg: Option<&Message>) -> HashSet<String> {
134    let avail: HashSet<&str> = prev_msg
135        .filter(|p| p.role == Role::Assistant)
136        .map(|p| {
137            p.parts
138                .iter()
139                .filter_map(|part| {
140                    if let MessagePart::ToolUse { id, .. } = part {
141                        Some(id.as_str())
142                    } else {
143                        None
144                    }
145                })
146                .collect()
147        })
148        .unwrap_or_default();
149    msg.parts
150        .iter()
151        .filter_map(|p| {
152            if let MessagePart::ToolResult { tool_use_id, .. } = p
153                && !avail.contains(tool_use_id.as_str())
154            {
155                Some(tool_use_id.clone())
156            } else {
157                None
158            }
159        })
160        .collect()
161}
162
163/// Returns `true` if `content` contains human-readable text beyond legacy tool bracket markers.
164///
165/// Legacy markers produced by `Message::flatten_parts` are:
166/// - `[tool_use: name(id)]` — assistant `ToolUse`
167/// - `[tool_result: id]\nbody` — user `ToolResult` (tag + trailing body up to the next tag)
168/// - `[tool output: name] body` — `ToolOutput` (pruned or inline)
169/// - `[tool output: name]\n```\nbody\n``` ` — `ToolOutput` fenced block
170///
171/// A message whose content consists solely of such markers (and whitespace) has no
172/// user-visible text and is a candidate for soft-delete once its structured `parts` are gone.
173///
174/// Conservative rule: if a tag is malformed (no closing `]`), the content is treated as
175/// meaningful and the message is NOT deleted.
176///
177/// Note: `[image: mime, N bytes]` placeholders are intentionally treated as meaningful because
178/// they represent real media content and are not pure tool-execution artifacts.
179///
180/// Note: the Claude request-builder format `[tool_use: name] {json_input}` is used only for
181/// API payload construction and is never written to `SQLite` — it cannot appear in persisted
182/// message content, so no special handling is needed here.
183fn has_meaningful_content(content: &str) -> bool {
184    const PREFIXES: [&str; 3] = ["[tool_use: ", "[tool_result: ", "[tool output: "];
185
186    let mut remaining = content.trim();
187
188    loop {
189        // Find the earliest occurrence of any tool tag prefix.
190        let next = PREFIXES
191            .iter()
192            .filter_map(|prefix| remaining.find(prefix).map(|pos| (pos, *prefix)))
193            .min_by_key(|(pos, _)| *pos);
194
195        let Some((start, prefix)) = next else {
196            // No more tool tags — whatever remains decides the verdict.
197            break;
198        };
199
200        // Any non-whitespace text before this tag is meaningful.
201        if !remaining[..start].trim().is_empty() {
202            return true;
203        }
204
205        // Advance past the prefix to find the closing `]`.
206        let after_prefix = &remaining[start + prefix.len()..];
207        let Some(close) = after_prefix.find(']') else {
208            // Malformed tag (no closing bracket) — treat as meaningful, do not delete.
209            return true;
210        };
211
212        // Position after the `]`.
213        let tag_end = start + prefix.len() + close + 1;
214
215        if prefix == "[tool_result: " || prefix == "[tool output: " {
216            // Skip the body that immediately follows until the next tool tag prefix or end-of-string.
217            // The body is part of the tool artifact, not human-readable content.
218            let body = remaining[tag_end..].trim_start_matches('\n');
219            let next_tag = PREFIXES
220                .iter()
221                .filter_map(|p| body.find(p))
222                .min()
223                .unwrap_or(body.len());
224            remaining = &body[next_tag..];
225        } else {
226            remaining = &remaining[tag_end..];
227        }
228    }
229
230    !remaining.trim().is_empty()
231}
232
233/// Scan all messages and strip orphaned `ToolUse`/`ToolResult` parts from mid-history messages.
234///
235/// Two directions are checked:
236/// - Forward: assistant message has `ToolUse` parts not matched by `ToolResult` in the next user
237///   message — strip those `ToolUse` parts.
238/// - Reverse: user message has `ToolResult` parts whose `tool_use_id` is not present as a
239///   `ToolUse` in the preceding assistant message — strip those `ToolResult` parts.
240///
241/// Text parts are preserved; messages with no remaining content are removed entirely.
242///
243/// Returns `(count, db_ids)` where `count` is the number of messages removed entirely and
244/// `db_ids` contains the `metadata.db_id` values of those removed messages (for DB cleanup).
245fn strip_mid_history_orphans(messages: &mut Vec<Message>) -> (usize, Vec<i64>) {
246    let mut removed = 0;
247    let mut db_ids: Vec<i64> = Vec::new();
248    let mut i = 0;
249    while i < messages.len() {
250        // Forward pass: strip ToolUse parts from assistant messages that lack a matching
251        // ToolResult in the next user message. Only orphaned IDs are stripped — other ToolUse
252        // parts in the same message that DO have a matching ToolResult are preserved.
253        if messages[i].role == Role::Assistant
254            && messages[i]
255                .parts
256                .iter()
257                .any(|p| matches!(p, MessagePart::ToolUse { .. }))
258        {
259            let orphaned_ids = orphaned_tool_use_ids(&messages[i], messages.get(i + 1));
260            if !orphaned_ids.is_empty() {
261                tracing::warn!(
262                    tool_ids = ?orphaned_ids,
263                    index = i,
264                    "stripping orphaned mid-history tool_use parts from assistant message"
265                );
266                messages[i].parts.retain(
267                    |p| !matches!(p, MessagePart::ToolUse { id, .. } if orphaned_ids.contains(id)),
268                );
269                let is_empty =
270                    !has_meaningful_content(&messages[i].content) && messages[i].parts.is_empty();
271                if is_empty {
272                    if let Some(db_id) = messages[i].metadata.db_id {
273                        db_ids.push(db_id);
274                    }
275                    messages.remove(i);
276                    removed += 1;
277                    continue; // Do not advance i — the next message is now at position i.
278                }
279            }
280        }
281
282        // Reverse pass: user ToolResult without matching ToolUse in the preceding assistant message.
283        if messages[i].role == Role::User
284            && messages[i]
285                .parts
286                .iter()
287                .any(|p| matches!(p, MessagePart::ToolResult { .. }))
288        {
289            let orphaned_ids = orphaned_tool_result_ids(
290                &messages[i],
291                if i > 0 { messages.get(i - 1) } else { None },
292            );
293            if !orphaned_ids.is_empty() {
294                tracing::warn!(
295                    tool_use_ids = ?orphaned_ids,
296                    index = i,
297                    "stripping orphaned mid-history tool_result parts from user message"
298                );
299                messages[i].parts.retain(|p| {
300                    !matches!(p, MessagePart::ToolResult { tool_use_id, .. } if orphaned_ids.contains(tool_use_id.as_str()))
301                });
302
303                let is_empty =
304                    !has_meaningful_content(&messages[i].content) && messages[i].parts.is_empty();
305                if is_empty {
306                    if let Some(db_id) = messages[i].metadata.db_id {
307                        db_ids.push(db_id);
308                    }
309                    messages.remove(i);
310                    removed += 1;
311                    // Do not advance i — the next message is now at position i.
312                    continue;
313                }
314            }
315        }
316
317        i += 1;
318    }
319    (removed, db_ids)
320}
321
322impl<C: Channel> Agent<C> {
323    /// Load conversation history from memory and inject into messages.
324    ///
325    /// # Errors
326    ///
327    /// Returns an error if loading history from `SQLite` fails.
328    pub async fn load_history(&mut self) -> Result<(), super::error::AgentError> {
329        let (Some(memory), Some(cid)) = (
330            &self.memory_state.persistence.memory,
331            self.memory_state.persistence.conversation_id,
332        ) else {
333            return Ok(());
334        };
335
336        let history = memory
337            .sqlite()
338            .load_history_filtered(
339                cid,
340                self.memory_state.persistence.history_limit,
341                Some(true),
342                None,
343            )
344            .await?;
345        if !history.is_empty() {
346            let mut loaded = 0;
347            let mut skipped = 0;
348
349            for msg in history {
350                // Only skip messages that have neither text content nor structured parts.
351                // Native tool calls produce user messages with empty `content` but non-empty
352                // `parts` (containing ToolResult). Skipping them here would orphan the
353                // preceding assistant ToolUse before sanitize_tool_pairs can clean it up.
354                if !has_meaningful_content(&msg.content) && msg.parts.is_empty() {
355                    tracing::warn!("skipping empty message from history (role: {:?})", msg.role);
356                    skipped += 1;
357                    continue;
358                }
359                self.msg.messages.push(msg);
360                loaded += 1;
361            }
362
363            // Determine the start index of just-loaded messages (system prompt is at index 0).
364            let history_start = self.msg.messages.len() - loaded;
365            let mut restored_slice = self.msg.messages.split_off(history_start);
366            let (orphans, orphan_db_ids) = sanitize_tool_pairs(&mut restored_slice);
367            skipped += orphans;
368            loaded = loaded.saturating_sub(orphans);
369            self.msg.messages.append(&mut restored_slice);
370
371            if !orphan_db_ids.is_empty() {
372                let ids: Vec<zeph_memory::types::MessageId> = orphan_db_ids
373                    .iter()
374                    .map(|&id| zeph_memory::types::MessageId(id))
375                    .collect();
376                if let Err(e) = memory.sqlite().soft_delete_messages(&ids).await {
377                    tracing::warn!(
378                        count = ids.len(),
379                        error = %e,
380                        "failed to soft-delete orphaned tool-pair messages from DB"
381                    );
382                } else {
383                    tracing::debug!(
384                        count = ids.len(),
385                        "soft-deleted orphaned tool-pair messages from DB"
386                    );
387                }
388            }
389
390            tracing::info!("restored {loaded} message(s) from conversation {cid}");
391            if skipped > 0 {
392                tracing::warn!("skipped {skipped} empty/orphaned message(s) from history");
393            }
394
395            if loaded > 0 {
396                // Increment session counts so tier promotion can track cross-session access.
397                // Errors are non-fatal — promotion will simply use stale counts.
398                let _ = memory
399                    .sqlite()
400                    .increment_session_counts_for_conversation(cid)
401                    .await
402                    .inspect_err(|e| {
403                        tracing::warn!(error = %e, "failed to increment tier session counts");
404                    });
405            }
406        }
407
408        if let Ok(count) = memory.message_count(cid).await {
409            let count_u64 = u64::try_from(count).unwrap_or(0);
410            self.update_metrics(|m| {
411                m.sqlite_message_count = count_u64;
412            });
413        }
414
415        if let Ok(count) = memory.sqlite().count_semantic_facts().await {
416            let count_u64 = u64::try_from(count).unwrap_or(0);
417            self.update_metrics(|m| {
418                m.semantic_fact_count = count_u64;
419            });
420        }
421
422        if let Ok(count) = memory.unsummarized_message_count(cid).await {
423            self.memory_state.persistence.unsummarized_count = usize::try_from(count).unwrap_or(0);
424        }
425
426        self.recompute_prompt_tokens();
427        Ok(())
428    }
429
430    /// Persist a message to memory.
431    ///
432    /// `has_injection_flags` controls whether Qdrant embedding is skipped for this message.
433    /// When `true` and `guard_memory_writes` is enabled, only `SQLite` is written — the message
434    /// is saved for conversation continuity but will not pollute semantic search (M2, D2).
435    #[allow(clippy::too_many_lines)]
436    #[cfg_attr(
437        feature = "profiling",
438        tracing::instrument(name = "agent.persist_message", skip_all)
439    )]
440    pub(crate) async fn persist_message(
441        &mut self,
442        role: Role,
443        content: &str,
444        parts: &[MessagePart],
445        has_injection_flags: bool,
446    ) {
447        let (Some(memory), Some(cid)) = (
448            &self.memory_state.persistence.memory,
449            self.memory_state.persistence.conversation_id,
450        ) else {
451            return;
452        };
453
454        let parts_json = if parts.is_empty() {
455            "[]".to_string()
456        } else {
457            serde_json::to_string(parts).unwrap_or_else(|e| {
458                tracing::warn!("failed to serialize message parts, storing empty: {e}");
459                "[]".to_string()
460            })
461        };
462
463        // M2: injection flag is passed explicitly to avoid stale mutable-bool state on Agent.
464        // When has_injection_flags=true, skip embedding to prevent poisoned content from
465        // polluting Qdrant semantic search results.
466        let guard_event = self
467            .security
468            .exfiltration_guard
469            .should_guard_memory_write(has_injection_flags);
470        if let Some(ref event) = guard_event {
471            tracing::warn!(
472                ?event,
473                "exfiltration guard: skipping Qdrant embedding for flagged content"
474            );
475            self.update_metrics(|m| m.exfiltration_memory_guards += 1);
476            self.push_security_event(
477                crate::metrics::SecurityEventCategory::ExfiltrationBlock,
478                "memory_write",
479                "Qdrant embedding skipped: flagged content",
480            );
481        }
482
483        let skip_embedding = guard_event.is_some();
484
485        // Do not embed [skipped] or [stopped] ToolResult content into Qdrant — these are
486        // internal policy markers that carry no useful semantic information and would
487        // contaminate memory_search results, causing the utility-gate Retrieve loop (#2620).
488        let has_skipped_tool_result = parts.iter().any(|p| {
489            if let MessagePart::ToolResult { content, .. } = p {
490                content.starts_with("[skipped]") || content.starts_with("[stopped]")
491            } else {
492                false
493            }
494        });
495
496        let should_embed = if skip_embedding || has_skipped_tool_result {
497            false
498        } else {
499            match role {
500                Role::Assistant => {
501                    self.memory_state.persistence.autosave_assistant
502                        && content.len() >= self.memory_state.persistence.autosave_min_length
503                }
504                _ => true,
505            }
506        };
507
508        let goal_text = self.memory_state.extraction.goal_text.clone();
509
510        tracing::debug!(
511            "persist_message: calling remember_with_parts, embed dispatched to background"
512        );
513        let (embedding_stored, was_persisted) = if should_embed {
514            match memory
515                .remember_with_parts(
516                    cid,
517                    role_str(role),
518                    content,
519                    &parts_json,
520                    goal_text.as_deref(),
521                )
522                .await
523            {
524                Ok((Some(message_id), stored)) => {
525                    self.msg.last_persisted_message_id = Some(message_id.0);
526                    (stored, true)
527                }
528                Ok((None, _)) => {
529                    // A-MAC admission rejected — skip increment and further processing.
530                    return;
531                }
532                Err(e) => {
533                    tracing::error!("failed to persist message: {e:#}");
534                    return;
535                }
536            }
537        } else {
538            match memory
539                .save_only(cid, role_str(role), content, &parts_json)
540                .await
541            {
542                Ok(message_id) => {
543                    self.msg.last_persisted_message_id = Some(message_id.0);
544                    (false, true)
545                }
546                Err(e) => {
547                    tracing::error!("failed to persist message: {e:#}");
548                    return;
549                }
550            }
551        };
552
553        if !was_persisted {
554            return;
555        }
556
557        self.memory_state.persistence.unsummarized_count += 1;
558
559        self.update_metrics(|m| {
560            m.sqlite_message_count += 1;
561            if embedding_stored {
562                m.embeddings_generated += 1;
563            }
564        });
565
566        tracing::debug!("persist_message: db insert complete, embedding running in background");
567        memory.reap_embed_tasks();
568
569        // Phase 2: enqueue enrichment tasks via supervisor (non-blocking).
570        // check_summarization signals completion via SummarizationSignal, consumed in reap()
571        // between turns — no shared mutable state across tasks (S1 fix).
572        self.enqueue_summarization_task();
573
574        // FIX-1: skip graph extraction for tool result messages — they contain raw structured
575        // output (TOML, JSON, code) that pollutes the entity graph with noise.
576        let has_tool_result_parts = parts
577            .iter()
578            .any(|p| matches!(p, MessagePart::ToolResult { .. }));
579
580        self.enqueue_graph_extraction_task(content, has_injection_flags, has_tool_result_parts)
581            .await;
582
583        // Persona extraction: run only for user messages that are not tool results and not injected.
584        if role == Role::User && !has_tool_result_parts && !has_injection_flags {
585            self.enqueue_persona_extraction_task();
586        }
587
588        // Trajectory extraction: run after turns that contained tool results.
589        if has_tool_result_parts {
590            self.enqueue_trajectory_extraction_task();
591        }
592    }
593
594    /// Enqueue background summarization via the supervisor (S1 fix: no shared `AtomicUsize`).
595    fn enqueue_summarization_task(&mut self) {
596        let (Some(memory), Some(cid)) = (
597            self.memory_state.persistence.memory.clone(),
598            self.memory_state.persistence.conversation_id,
599        ) else {
600            return;
601        };
602
603        if self.memory_state.persistence.unsummarized_count
604            <= self.memory_state.compaction.summarization_threshold
605        {
606            return;
607        }
608
609        let batch_size = self.memory_state.compaction.summarization_threshold / 2;
610
611        self.lifecycle.supervisor.spawn_summarization("summarization", async move {
612            match tokio::time::timeout(
613                std::time::Duration::from_secs(30),
614                memory.summarize(cid, batch_size),
615            )
616            .await
617            {
618                Ok(Ok(Some(summary_id))) => {
619                    tracing::info!(
620                        "background summarization: created summary {summary_id} for conversation {cid}"
621                    );
622                    true
623                }
624                Ok(Ok(None)) => {
625                    tracing::debug!("background summarization: no summarization needed");
626                    false
627                }
628                Ok(Err(e)) => {
629                    tracing::error!("background summarization failed: {e:#}");
630                    false
631                }
632                Err(_) => {
633                    tracing::warn!("background summarization timed out after 30s");
634                    false
635                }
636            }
637        });
638    }
639
640    /// Prepare graph extraction guards in foreground, then enqueue heavy work via supervisor.
641    ///
642    /// Guards (enabled check, injection/tool-result skip) stay on the foreground path.
643    /// The RPE check and actual extraction run in background (S2: no `send_status`).
644    #[allow(clippy::too_many_lines)]
645    async fn enqueue_graph_extraction_task(
646        &mut self,
647        content: &str,
648        has_injection_flags: bool,
649        has_tool_result_parts: bool,
650    ) {
651        use zeph_memory::semantic::GraphExtractionConfig;
652
653        if self.memory_state.persistence.memory.is_none()
654            || self.memory_state.persistence.conversation_id.is_none()
655        {
656            return;
657        }
658        if has_tool_result_parts {
659            tracing::debug!("graph extraction skipped: message contains ToolResult parts");
660            return;
661        }
662        if has_injection_flags {
663            tracing::warn!("graph extraction skipped: injection patterns detected in content");
664            return;
665        }
666
667        let extraction_cfg = {
668            let cfg = &self.memory_state.extraction.graph_config;
669            if !cfg.enabled {
670                return;
671            }
672            GraphExtractionConfig {
673                max_entities: cfg.max_entities_per_message,
674                max_edges: cfg.max_edges_per_message,
675                extraction_timeout_secs: cfg.extraction_timeout_secs,
676                community_refresh_interval: cfg.community_refresh_interval,
677                expired_edge_retention_days: cfg.expired_edge_retention_days,
678                max_entities_cap: cfg.max_entities,
679                community_summary_max_prompt_bytes: cfg.community_summary_max_prompt_bytes,
680                community_summary_concurrency: cfg.community_summary_concurrency,
681                lpa_edge_chunk_size: cfg.lpa_edge_chunk_size,
682                note_linking: zeph_memory::NoteLinkingConfig {
683                    enabled: cfg.note_linking.enabled,
684                    similarity_threshold: cfg.note_linking.similarity_threshold,
685                    top_k: cfg.note_linking.top_k,
686                    timeout_secs: cfg.note_linking.timeout_secs,
687                },
688                link_weight_decay_lambda: cfg.link_weight_decay_lambda,
689                link_weight_decay_interval_secs: cfg.link_weight_decay_interval_secs,
690                belief_revision_enabled: cfg.belief_revision.enabled,
691                belief_revision_similarity_threshold: cfg.belief_revision.similarity_threshold,
692                conversation_id: self.memory_state.persistence.conversation_id.map(|c| c.0),
693            }
694        };
695
696        // RPE check: embed + compute surprise score. Stays on foreground to avoid
697        // capturing the rpe_router mutex in a background task.
698        if self.rpe_should_skip(content).await {
699            tracing::debug!("D-MEM RPE: low-surprise turn, skipping graph extraction");
700            return;
701        }
702
703        let context_messages: Vec<String> = self
704            .msg
705            .messages
706            .iter()
707            .rev()
708            .filter(|m| {
709                m.role == Role::User
710                    && !m
711                        .parts
712                        .iter()
713                        .any(|p| matches!(p, MessagePart::ToolResult { .. }))
714            })
715            .take(4)
716            .map(|m| {
717                if m.content.len() > 2048 {
718                    m.content[..m.content.floor_char_boundary(2048)].to_owned()
719                } else {
720                    m.content.clone()
721                }
722            })
723            .collect();
724
725        let Some(memory) = self.memory_state.persistence.memory.clone() else {
726            return;
727        };
728
729        let validator: zeph_memory::semantic::PostExtractValidator =
730            if self.security.memory_validator.is_enabled() {
731                let v = self.security.memory_validator.clone();
732                Some(Box::new(move |result| {
733                    v.validate_graph_extraction(result)
734                        .map_err(|e| e.to_string())
735                }))
736            } else {
737                None
738            };
739
740        let content_owned = content.to_owned();
741        let graph_store = memory.graph_store.clone();
742        let metrics_tx = self.metrics.metrics_tx.clone();
743        let start_time = self.lifecycle.start_time;
744
745        self.lifecycle.supervisor.spawn(
746            super::agent_supervisor::TaskClass::Enrichment,
747            "graph_extraction",
748            async move {
749                let extraction_handle = memory.spawn_graph_extraction(
750                    content_owned,
751                    context_messages,
752                    extraction_cfg,
753                    validator,
754                );
755
756                // After extraction completes, refresh graph count metrics (was Telemetry spawn).
757                if let (Some(store), Some(tx)) = (graph_store, metrics_tx) {
758                    let _ = extraction_handle.await;
759                    let (entities, edges, communities) = tokio::join!(
760                        store.entity_count(),
761                        store.active_edge_count(),
762                        store.community_count()
763                    );
764                    let elapsed = start_time.elapsed().as_secs();
765                    tx.send_modify(|m| {
766                        m.uptime_seconds = elapsed;
767                        m.graph_entities_total = entities.unwrap_or(0).cast_unsigned();
768                        m.graph_edges_total = edges.unwrap_or(0).cast_unsigned();
769                        m.graph_communities_total = communities.unwrap_or(0).cast_unsigned();
770                    });
771                } else {
772                    let _ = extraction_handle.await;
773                }
774
775                tracing::debug!("background graph extraction complete");
776            },
777        );
778
779        // Sync community failures and extraction metrics (cheap, foreground-safe).
780        self.sync_community_detection_failures();
781        self.sync_graph_extraction_metrics();
782        // sync_graph_counts and sync_guidelines_status are DB reads; move to Telemetry background.
783        let memory_for_sync = self.memory_state.persistence.memory.clone();
784        let metrics_tx_sync = self.metrics.metrics_tx.clone();
785        let start_time_sync = self.lifecycle.start_time;
786        let cid_sync = self.memory_state.persistence.conversation_id;
787        let graph_store_sync = memory_for_sync.as_ref().and_then(|m| m.graph_store.clone());
788        let sqlite_sync = memory_for_sync.as_ref().map(|m| m.sqlite().clone());
789        let guidelines_enabled = self.memory_state.extraction.graph_config.enabled;
790
791        self.lifecycle.supervisor.spawn(
792            super::agent_supervisor::TaskClass::Telemetry,
793            "graph_count_sync",
794            async move {
795                let Some(store) = graph_store_sync else {
796                    return;
797                };
798                let Some(tx) = metrics_tx_sync else { return };
799
800                let (entities, edges, communities) = tokio::join!(
801                    store.entity_count(),
802                    store.active_edge_count(),
803                    store.community_count()
804                );
805                let elapsed = start_time_sync.elapsed().as_secs();
806                tx.send_modify(|m| {
807                    m.uptime_seconds = elapsed;
808                    m.graph_entities_total = entities.unwrap_or(0).cast_unsigned();
809                    m.graph_edges_total = edges.unwrap_or(0).cast_unsigned();
810                    m.graph_communities_total = communities.unwrap_or(0).cast_unsigned();
811                });
812
813                // Sync guidelines status.
814                if guidelines_enabled && let Some(sqlite) = sqlite_sync {
815                    match tokio::time::timeout(
816                        std::time::Duration::from_secs(10),
817                        sqlite.load_compression_guidelines_meta(cid_sync),
818                    )
819                    .await
820                    {
821                        Ok(Ok((version, created_at))) => {
822                            #[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
823                            let version_u32 = u32::try_from(version).unwrap_or(0);
824                            tx.send_modify(|m| {
825                                m.guidelines_version = version_u32;
826                                m.guidelines_updated_at = created_at;
827                            });
828                        }
829                        Ok(Err(e)) => {
830                            tracing::debug!("guidelines status sync failed: {e:#}");
831                        }
832                        Err(_) => {
833                            tracing::debug!("guidelines status sync timed out");
834                        }
835                    }
836                }
837            },
838        );
839    }
840
841    /// Enqueue persona extraction via supervisor (background, no `send_status`).
842    fn enqueue_persona_extraction_task(&mut self) {
843        use zeph_memory::semantic::{PersonaExtractionConfig, extract_persona_facts};
844
845        let cfg = &self.memory_state.extraction.persona_config;
846        if !cfg.enabled {
847            return;
848        }
849
850        let Some(memory) = &self.memory_state.persistence.memory else {
851            return;
852        };
853
854        let user_messages: Vec<String> = self
855            .msg
856            .messages
857            .iter()
858            .filter(|m| {
859                m.role == Role::User
860                    && !m
861                        .parts
862                        .iter()
863                        .any(|p| matches!(p, MessagePart::ToolResult { .. }))
864            })
865            .take(8)
866            .map(|m| {
867                if m.content.len() > 2048 {
868                    m.content[..m.content.floor_char_boundary(2048)].to_owned()
869                } else {
870                    m.content.clone()
871                }
872            })
873            .collect();
874
875        if user_messages.len() < cfg.min_messages {
876            return;
877        }
878
879        let timeout_secs = cfg.extraction_timeout_secs;
880        let extraction_cfg = PersonaExtractionConfig {
881            enabled: cfg.enabled,
882            min_messages: cfg.min_messages,
883            max_messages: cfg.max_messages,
884            extraction_timeout_secs: timeout_secs,
885        };
886
887        let provider = self.resolve_background_provider(cfg.persona_provider.as_str());
888        let store = memory.sqlite().clone();
889        let conversation_id = self.memory_state.persistence.conversation_id.map(|c| c.0);
890
891        self.lifecycle.supervisor.spawn(
892            super::agent_supervisor::TaskClass::Enrichment,
893            "persona_extraction",
894            async move {
895                let user_message_refs: Vec<&str> =
896                    user_messages.iter().map(String::as_str).collect();
897                let fut = extract_persona_facts(
898                    &store,
899                    &provider,
900                    &user_message_refs,
901                    &extraction_cfg,
902                    conversation_id,
903                );
904                match tokio::time::timeout(std::time::Duration::from_secs(timeout_secs), fut).await
905                {
906                    Ok(Ok(n)) => tracing::debug!(upserted = n, "persona extraction complete"),
907                    Ok(Err(e)) => tracing::warn!(error = %e, "persona extraction failed"),
908                    Err(_) => tracing::warn!(
909                        timeout_secs,
910                        "persona extraction timed out — no facts written this turn"
911                    ),
912                }
913            },
914        );
915    }
916
917    /// Enqueue trajectory extraction via supervisor (background).
918    fn enqueue_trajectory_extraction_task(&mut self) {
919        use zeph_memory::semantic::{TrajectoryExtractionConfig, extract_trajectory_entries};
920
921        let cfg = self.memory_state.extraction.trajectory_config.clone();
922        if !cfg.enabled {
923            return;
924        }
925
926        let Some(memory) = &self.memory_state.persistence.memory else {
927            return;
928        };
929
930        let conversation_id = match self.memory_state.persistence.conversation_id {
931            Some(cid) => cid.0,
932            None => return,
933        };
934
935        let tail_start = self.msg.messages.len().saturating_sub(cfg.max_messages);
936        let turn_messages: Vec<zeph_llm::provider::Message> =
937            self.msg.messages[tail_start..].to_vec();
938
939        if turn_messages.is_empty() {
940            return;
941        }
942
943        let extraction_cfg = TrajectoryExtractionConfig {
944            enabled: cfg.enabled,
945            max_messages: cfg.max_messages,
946            extraction_timeout_secs: cfg.extraction_timeout_secs,
947        };
948
949        let provider = self.resolve_background_provider(cfg.trajectory_provider.as_str());
950        let store = memory.sqlite().clone();
951        let min_confidence = cfg.min_confidence;
952
953        self.lifecycle.supervisor.spawn(
954            super::agent_supervisor::TaskClass::Enrichment,
955            "trajectory_extraction",
956            async move {
957                let entries =
958                    match extract_trajectory_entries(&provider, &turn_messages, &extraction_cfg)
959                        .await
960                    {
961                        Ok(e) => e,
962                        Err(e) => {
963                            tracing::warn!(error = %e, "trajectory extraction failed");
964                            return;
965                        }
966                    };
967
968                let last_id = store
969                    .trajectory_last_extracted_message_id(conversation_id)
970                    .await
971                    .unwrap_or(0);
972
973                let mut max_id = last_id;
974                for entry in &entries {
975                    if entry.confidence < min_confidence {
976                        continue;
977                    }
978                    let tools_json = serde_json::to_string(&entry.tools_used)
979                        .unwrap_or_else(|_| "[]".to_string());
980                    match store
981                        .insert_trajectory_entry(zeph_memory::NewTrajectoryEntry {
982                            conversation_id: Some(conversation_id),
983                            turn_index: 0,
984                            kind: &entry.kind,
985                            intent: &entry.intent,
986                            outcome: &entry.outcome,
987                            tools_used: &tools_json,
988                            confidence: entry.confidence,
989                        })
990                        .await
991                    {
992                        Ok(id) => {
993                            if id > max_id {
994                                max_id = id;
995                            }
996                        }
997                        Err(e) => tracing::warn!(error = %e, "failed to insert trajectory entry"),
998                    }
999                }
1000
1001                if max_id > last_id {
1002                    let _ = store
1003                        .set_trajectory_last_extracted_message_id(conversation_id, max_id)
1004                        .await;
1005                }
1006
1007                tracing::debug!(
1008                    count = entries.len(),
1009                    conversation_id,
1010                    "trajectory extraction complete"
1011                );
1012            },
1013        );
1014    }
1015
1016    /// D-MEM RPE check: returns `true` when the current turn should skip graph extraction.
1017    ///
1018    /// Embeds `content`, computes RPE via the router, and updates the router state.
1019    /// Returns `false` (do not skip) on any error — conservative fallback.
1020    async fn rpe_should_skip(&mut self, content: &str) -> bool {
1021        let Some(ref rpe_mutex) = self.memory_state.extraction.rpe_router else {
1022            return false;
1023        };
1024        let Some(memory) = &self.memory_state.persistence.memory else {
1025            return false;
1026        };
1027        let candidates = zeph_memory::extract_candidate_entities(content);
1028        let provider = memory.provider();
1029        let Ok(Ok(emb_vec)) =
1030            tokio::time::timeout(std::time::Duration::from_secs(5), provider.embed(content)).await
1031        else {
1032            return false; // embed failed/timed out → extract
1033        };
1034        if let Ok(mut router) = rpe_mutex.lock() {
1035            let signal = router.compute(&emb_vec, &candidates);
1036            router.push_embedding(emb_vec);
1037            router.push_entities(&candidates);
1038            !signal.should_extract
1039        } else {
1040            tracing::warn!("rpe_router mutex poisoned; falling through to extract");
1041            false
1042        }
1043    }
1044}
1045
1046#[cfg(test)]
1047mod tests {
1048    use super::super::agent_tests::{
1049        MetricsSnapshot, MockChannel, MockToolExecutor, create_test_registry, mock_provider,
1050    };
1051    use super::*;
1052    use zeph_llm::any::AnyProvider;
1053    use zeph_memory::semantic::SemanticMemory;
1054
1055    async fn test_memory(provider: &AnyProvider) -> SemanticMemory {
1056        SemanticMemory::new(
1057            ":memory:",
1058            "http://127.0.0.1:1",
1059            provider.clone(),
1060            "test-model",
1061        )
1062        .await
1063        .unwrap()
1064    }
1065
1066    #[tokio::test]
1067    async fn load_history_without_memory_returns_ok() {
1068        let provider = mock_provider(vec![]);
1069        let channel = MockChannel::new(vec![]);
1070        let registry = create_test_registry();
1071        let executor = MockToolExecutor::no_tools();
1072        let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
1073
1074        let result = agent.load_history().await;
1075        assert!(result.is_ok());
1076        // No messages added when no memory is configured
1077        assert_eq!(agent.msg.messages.len(), 1); // system prompt only
1078    }
1079
1080    #[tokio::test]
1081    async fn load_history_with_messages_injects_into_agent() {
1082        let provider = mock_provider(vec![]);
1083        let channel = MockChannel::new(vec![]);
1084        let registry = create_test_registry();
1085        let executor = MockToolExecutor::no_tools();
1086
1087        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1088        let cid = memory.sqlite().create_conversation().await.unwrap();
1089
1090        memory
1091            .sqlite()
1092            .save_message(cid, "user", "hello from history")
1093            .await
1094            .unwrap();
1095        memory
1096            .sqlite()
1097            .save_message(cid, "assistant", "hi back")
1098            .await
1099            .unwrap();
1100
1101        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
1102            std::sync::Arc::new(memory),
1103            cid,
1104            50,
1105            5,
1106            100,
1107        );
1108
1109        let messages_before = agent.msg.messages.len();
1110        agent.load_history().await.unwrap();
1111        // Two messages were added from history
1112        assert_eq!(agent.msg.messages.len(), messages_before + 2);
1113    }
1114
1115    #[tokio::test]
1116    async fn load_history_skips_empty_messages() {
1117        let provider = mock_provider(vec![]);
1118        let channel = MockChannel::new(vec![]);
1119        let registry = create_test_registry();
1120        let executor = MockToolExecutor::no_tools();
1121
1122        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1123        let cid = memory.sqlite().create_conversation().await.unwrap();
1124
1125        // Save an empty message (should be skipped) and a valid one
1126        memory
1127            .sqlite()
1128            .save_message(cid, "user", "   ")
1129            .await
1130            .unwrap();
1131        memory
1132            .sqlite()
1133            .save_message(cid, "user", "real message")
1134            .await
1135            .unwrap();
1136
1137        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
1138            std::sync::Arc::new(memory),
1139            cid,
1140            50,
1141            5,
1142            100,
1143        );
1144
1145        let messages_before = agent.msg.messages.len();
1146        agent.load_history().await.unwrap();
1147        // Only the non-empty message is loaded
1148        assert_eq!(agent.msg.messages.len(), messages_before + 1);
1149    }
1150
1151    #[tokio::test]
1152    async fn load_history_with_empty_store_returns_ok() {
1153        let provider = mock_provider(vec![]);
1154        let channel = MockChannel::new(vec![]);
1155        let registry = create_test_registry();
1156        let executor = MockToolExecutor::no_tools();
1157
1158        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1159        let cid = memory.sqlite().create_conversation().await.unwrap();
1160
1161        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
1162            std::sync::Arc::new(memory),
1163            cid,
1164            50,
1165            5,
1166            100,
1167        );
1168
1169        let messages_before = agent.msg.messages.len();
1170        agent.load_history().await.unwrap();
1171        // No messages added — empty history
1172        assert_eq!(agent.msg.messages.len(), messages_before);
1173    }
1174
1175    #[tokio::test]
1176    async fn load_history_increments_session_count_for_existing_messages() {
1177        let provider = mock_provider(vec![]);
1178        let channel = MockChannel::new(vec![]);
1179        let registry = create_test_registry();
1180        let executor = MockToolExecutor::no_tools();
1181
1182        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1183        let cid = memory.sqlite().create_conversation().await.unwrap();
1184
1185        // Save two messages — they start with session_count = 0.
1186        let id1 = memory
1187            .sqlite()
1188            .save_message(cid, "user", "hello")
1189            .await
1190            .unwrap();
1191        let id2 = memory
1192            .sqlite()
1193            .save_message(cid, "assistant", "hi")
1194            .await
1195            .unwrap();
1196
1197        let memory_arc = std::sync::Arc::new(memory);
1198        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
1199            memory_arc.clone(),
1200            cid,
1201            50,
1202            5,
1203            100,
1204        );
1205
1206        agent.load_history().await.unwrap();
1207
1208        // Both episodic messages must have session_count = 1 after restore.
1209        let counts: Vec<i64> = zeph_db::query_scalar(
1210            "SELECT session_count FROM messages WHERE id IN (?, ?) ORDER BY id",
1211        )
1212        .bind(id1)
1213        .bind(id2)
1214        .fetch_all(memory_arc.sqlite().pool())
1215        .await
1216        .unwrap();
1217        assert_eq!(
1218            counts,
1219            vec![1, 1],
1220            "session_count must be 1 after first restore"
1221        );
1222    }
1223
1224    #[tokio::test]
1225    async fn load_history_does_not_increment_session_count_for_new_conversation() {
1226        let provider = mock_provider(vec![]);
1227        let channel = MockChannel::new(vec![]);
1228        let registry = create_test_registry();
1229        let executor = MockToolExecutor::no_tools();
1230
1231        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1232        let cid = memory.sqlite().create_conversation().await.unwrap();
1233
1234        // No messages saved — empty conversation.
1235        let memory_arc = std::sync::Arc::new(memory);
1236        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
1237            memory_arc.clone(),
1238            cid,
1239            50,
1240            5,
1241            100,
1242        );
1243
1244        agent.load_history().await.unwrap();
1245
1246        // No rows → no session_count increments → query returns empty.
1247        let counts: Vec<i64> =
1248            zeph_db::query_scalar("SELECT session_count FROM messages WHERE conversation_id = ?")
1249                .bind(cid)
1250                .fetch_all(memory_arc.sqlite().pool())
1251                .await
1252                .unwrap();
1253        assert!(counts.is_empty(), "new conversation must have no messages");
1254    }
1255
1256    #[tokio::test]
1257    async fn persist_message_without_memory_silently_returns() {
1258        // No memory configured — persist_message must not panic
1259        let provider = mock_provider(vec![]);
1260        let channel = MockChannel::new(vec![]);
1261        let registry = create_test_registry();
1262        let executor = MockToolExecutor::no_tools();
1263        let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
1264
1265        // Must not panic and must complete
1266        agent.persist_message(Role::User, "hello", &[], false).await;
1267    }
1268
1269    #[tokio::test]
1270    async fn persist_message_assistant_autosave_false_uses_save_only() {
1271        let provider = mock_provider(vec![]);
1272        let channel = MockChannel::new(vec![]);
1273        let registry = create_test_registry();
1274        let executor = MockToolExecutor::no_tools();
1275
1276        let (tx, rx) = tokio::sync::watch::channel(MetricsSnapshot::default());
1277        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1278        let cid = memory.sqlite().create_conversation().await.unwrap();
1279
1280        let mut agent = Agent::new(provider, channel, registry, None, 5, executor)
1281            .with_metrics(tx)
1282            .with_memory(std::sync::Arc::new(memory), cid, 50, 5, 100);
1283        agent.memory_state.persistence.autosave_assistant = false;
1284        agent.memory_state.persistence.autosave_min_length = 20;
1285
1286        agent
1287            .persist_message(Role::Assistant, "short assistant reply", &[], false)
1288            .await;
1289
1290        let history = agent
1291            .memory_state
1292            .persistence
1293            .memory
1294            .as_ref()
1295            .unwrap()
1296            .sqlite()
1297            .load_history(cid, 50)
1298            .await
1299            .unwrap();
1300        assert_eq!(history.len(), 1, "message must be saved");
1301        assert_eq!(history[0].content, "short assistant reply");
1302        // embeddings_generated must remain 0 — save_only path does not embed
1303        assert_eq!(rx.borrow().embeddings_generated, 0);
1304    }
1305
1306    #[tokio::test]
1307    async fn persist_message_assistant_below_min_length_uses_save_only() {
1308        let provider = mock_provider(vec![]);
1309        let channel = MockChannel::new(vec![]);
1310        let registry = create_test_registry();
1311        let executor = MockToolExecutor::no_tools();
1312
1313        let (tx, rx) = tokio::sync::watch::channel(MetricsSnapshot::default());
1314        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1315        let cid = memory.sqlite().create_conversation().await.unwrap();
1316
1317        // autosave_assistant=true but min_length=1000 — short content falls back to save_only
1318        let mut agent = Agent::new(provider, channel, registry, None, 5, executor)
1319            .with_metrics(tx)
1320            .with_memory(std::sync::Arc::new(memory), cid, 50, 5, 100);
1321        agent.memory_state.persistence.autosave_assistant = true;
1322        agent.memory_state.persistence.autosave_min_length = 1000;
1323
1324        agent
1325            .persist_message(Role::Assistant, "too short", &[], false)
1326            .await;
1327
1328        let history = agent
1329            .memory_state
1330            .persistence
1331            .memory
1332            .as_ref()
1333            .unwrap()
1334            .sqlite()
1335            .load_history(cid, 50)
1336            .await
1337            .unwrap();
1338        assert_eq!(history.len(), 1, "message must be saved");
1339        assert_eq!(history[0].content, "too short");
1340        assert_eq!(rx.borrow().embeddings_generated, 0);
1341    }
1342
1343    #[tokio::test]
1344    async fn persist_message_assistant_at_min_length_boundary_uses_embed() {
1345        // content.len() == autosave_min_length → should_embed = true (>= boundary).
1346        let provider = mock_provider(vec![]);
1347        let channel = MockChannel::new(vec![]);
1348        let registry = create_test_registry();
1349        let executor = MockToolExecutor::no_tools();
1350
1351        let (tx, rx) = tokio::sync::watch::channel(MetricsSnapshot::default());
1352        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1353        let cid = memory.sqlite().create_conversation().await.unwrap();
1354
1355        let min_length = 10usize;
1356        let mut agent = Agent::new(provider, channel, registry, None, 5, executor)
1357            .with_metrics(tx)
1358            .with_memory(std::sync::Arc::new(memory), cid, 50, 5, 100);
1359        agent.memory_state.persistence.autosave_assistant = true;
1360        agent.memory_state.persistence.autosave_min_length = min_length;
1361
1362        // Exact boundary: len == min_length → embed path.
1363        let content_at_boundary = "A".repeat(min_length);
1364        assert_eq!(content_at_boundary.len(), min_length);
1365        agent
1366            .persist_message(Role::Assistant, &content_at_boundary, &[], false)
1367            .await;
1368
1369        // sqlite_message_count must be incremented regardless of embedding success.
1370        assert_eq!(rx.borrow().sqlite_message_count, 1);
1371    }
1372
1373    #[tokio::test]
1374    async fn persist_message_assistant_one_below_min_length_uses_save_only() {
1375        // content.len() == autosave_min_length - 1 → should_embed = false (below boundary).
1376        let provider = mock_provider(vec![]);
1377        let channel = MockChannel::new(vec![]);
1378        let registry = create_test_registry();
1379        let executor = MockToolExecutor::no_tools();
1380
1381        let (tx, rx) = tokio::sync::watch::channel(MetricsSnapshot::default());
1382        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1383        let cid = memory.sqlite().create_conversation().await.unwrap();
1384
1385        let min_length = 10usize;
1386        let mut agent = Agent::new(provider, channel, registry, None, 5, executor)
1387            .with_metrics(tx)
1388            .with_memory(std::sync::Arc::new(memory), cid, 50, 5, 100);
1389        agent.memory_state.persistence.autosave_assistant = true;
1390        agent.memory_state.persistence.autosave_min_length = min_length;
1391
1392        // One below boundary: len == min_length - 1 → save_only path, no embedding.
1393        let content_below_boundary = "A".repeat(min_length - 1);
1394        assert_eq!(content_below_boundary.len(), min_length - 1);
1395        agent
1396            .persist_message(Role::Assistant, &content_below_boundary, &[], false)
1397            .await;
1398
1399        let history = agent
1400            .memory_state
1401            .persistence
1402            .memory
1403            .as_ref()
1404            .unwrap()
1405            .sqlite()
1406            .load_history(cid, 50)
1407            .await
1408            .unwrap();
1409        assert_eq!(history.len(), 1, "message must still be saved");
1410        // save_only path does not embed.
1411        assert_eq!(rx.borrow().embeddings_generated, 0);
1412    }
1413
1414    #[tokio::test]
1415    async fn persist_message_increments_unsummarized_count() {
1416        let provider = mock_provider(vec![]);
1417        let channel = MockChannel::new(vec![]);
1418        let registry = create_test_registry();
1419        let executor = MockToolExecutor::no_tools();
1420
1421        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1422        let cid = memory.sqlite().create_conversation().await.unwrap();
1423
1424        // threshold=100 ensures no summarization is triggered
1425        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
1426            std::sync::Arc::new(memory),
1427            cid,
1428            50,
1429            5,
1430            100,
1431        );
1432
1433        assert_eq!(agent.memory_state.persistence.unsummarized_count, 0);
1434
1435        agent.persist_message(Role::User, "first", &[], false).await;
1436        assert_eq!(agent.memory_state.persistence.unsummarized_count, 1);
1437
1438        agent
1439            .persist_message(Role::User, "second", &[], false)
1440            .await;
1441        assert_eq!(agent.memory_state.persistence.unsummarized_count, 2);
1442    }
1443
1444    #[tokio::test]
1445    async fn check_summarization_resets_counter_on_success() {
1446        let provider = mock_provider(vec![]);
1447        let channel = MockChannel::new(vec![]);
1448        let registry = create_test_registry();
1449        let executor = MockToolExecutor::no_tools();
1450
1451        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1452        let cid = memory.sqlite().create_conversation().await.unwrap();
1453
1454        // threshold=1 so the second persist triggers summarization check (count > threshold)
1455        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
1456            std::sync::Arc::new(memory),
1457            cid,
1458            50,
1459            5,
1460            1,
1461        );
1462
1463        agent.persist_message(Role::User, "msg1", &[], false).await;
1464        agent.persist_message(Role::User, "msg2", &[], false).await;
1465
1466        // After summarization attempt (summarize returns Ok(None) since no messages qualify),
1467        // the counter is NOT reset to 0 — only reset on Ok(Some(_)).
1468        // This verifies check_summarization is called and the guard condition works.
1469        // unsummarized_count must be >= 2 before any summarization or 0 if summarization ran.
1470        assert!(agent.memory_state.persistence.unsummarized_count <= 2);
1471    }
1472
1473    #[tokio::test]
1474    async fn unsummarized_count_not_incremented_without_memory() {
1475        let provider = mock_provider(vec![]);
1476        let channel = MockChannel::new(vec![]);
1477        let registry = create_test_registry();
1478        let executor = MockToolExecutor::no_tools();
1479        let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
1480
1481        agent.persist_message(Role::User, "hello", &[], false).await;
1482        // No memory configured — persist_message returns early, counter must stay 0.
1483        assert_eq!(agent.memory_state.persistence.unsummarized_count, 0);
1484    }
1485
1486    // R-CRIT-01: unit tests for enqueue_graph_extraction_task guard conditions.
1487    mod graph_extraction_guards {
1488        use super::*;
1489        use crate::config::GraphConfig;
1490        use zeph_llm::provider::MessageMetadata;
1491        use zeph_memory::graph::GraphStore;
1492
1493        fn enabled_graph_config() -> GraphConfig {
1494            GraphConfig {
1495                enabled: true,
1496                ..GraphConfig::default()
1497            }
1498        }
1499
1500        async fn agent_with_graph(
1501            provider: &AnyProvider,
1502            config: GraphConfig,
1503        ) -> Agent<MockChannel> {
1504            let memory =
1505                test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1506            let cid = memory.sqlite().create_conversation().await.unwrap();
1507            Agent::new(
1508                provider.clone(),
1509                MockChannel::new(vec![]),
1510                create_test_registry(),
1511                None,
1512                5,
1513                MockToolExecutor::no_tools(),
1514            )
1515            .with_memory(std::sync::Arc::new(memory), cid, 50, 5, 100)
1516            .with_graph_config(config)
1517        }
1518
1519        #[tokio::test]
1520        async fn injection_flag_guard_skips_extraction() {
1521            // has_injection_flags=true → extraction must be skipped; no counter in graph_metadata.
1522            let provider = mock_provider(vec![]);
1523            let mut agent = agent_with_graph(&provider, enabled_graph_config()).await;
1524            let pool = agent
1525                .memory_state
1526                .persistence
1527                .memory
1528                .as_ref()
1529                .unwrap()
1530                .sqlite()
1531                .pool()
1532                .clone();
1533
1534            agent
1535                .enqueue_graph_extraction_task("I use Rust", true, false)
1536                .await;
1537
1538            // Give any accidental spawn time to settle.
1539            tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1540
1541            let store = GraphStore::new(pool);
1542            let count = store.get_metadata("extraction_count").await.unwrap();
1543            assert!(
1544                count.is_none(),
1545                "injection flag must prevent extraction_count from being written"
1546            );
1547        }
1548
1549        #[tokio::test]
1550        async fn disabled_config_guard_skips_extraction() {
1551            // graph.enabled=false → extraction must be skipped.
1552            let provider = mock_provider(vec![]);
1553            let disabled_cfg = GraphConfig {
1554                enabled: false,
1555                ..GraphConfig::default()
1556            };
1557            let mut agent = agent_with_graph(&provider, disabled_cfg).await;
1558            let pool = agent
1559                .memory_state
1560                .persistence
1561                .memory
1562                .as_ref()
1563                .unwrap()
1564                .sqlite()
1565                .pool()
1566                .clone();
1567
1568            agent
1569                .enqueue_graph_extraction_task("I use Rust", false, false)
1570                .await;
1571
1572            tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1573
1574            let store = GraphStore::new(pool);
1575            let count = store.get_metadata("extraction_count").await.unwrap();
1576            assert!(
1577                count.is_none(),
1578                "disabled graph config must prevent extraction"
1579            );
1580        }
1581
1582        #[tokio::test]
1583        async fn happy_path_fires_extraction() {
1584            // With enabled config and no injection flags, extraction is spawned.
1585            // MockProvider returns None (no entities), but the counter must be incremented.
1586            let provider = mock_provider(vec![]);
1587            let mut agent = agent_with_graph(&provider, enabled_graph_config()).await;
1588            let pool = agent
1589                .memory_state
1590                .persistence
1591                .memory
1592                .as_ref()
1593                .unwrap()
1594                .sqlite()
1595                .pool()
1596                .clone();
1597
1598            agent
1599                .enqueue_graph_extraction_task("I use Rust for systems programming", false, false)
1600                .await;
1601
1602            // Wait for the spawned task to complete.
1603            tokio::time::sleep(std::time::Duration::from_millis(200)).await;
1604
1605            let store = GraphStore::new(pool);
1606            let count = store.get_metadata("extraction_count").await.unwrap();
1607            assert!(
1608                count.is_some(),
1609                "happy-path extraction must increment extraction_count"
1610            );
1611        }
1612
1613        #[tokio::test]
1614        async fn tool_result_parts_guard_skips_extraction() {
1615            // FIX-1 regression: has_tool_result_parts=true → extraction must be skipped.
1616            // Tool result messages contain raw structured output (TOML, JSON, code) — not
1617            // conversational content. Extracting entities from them produces graph noise.
1618            let provider = mock_provider(vec![]);
1619            let mut agent = agent_with_graph(&provider, enabled_graph_config()).await;
1620            let pool = agent
1621                .memory_state
1622                .persistence
1623                .memory
1624                .as_ref()
1625                .unwrap()
1626                .sqlite()
1627                .pool()
1628                .clone();
1629
1630            agent
1631                .enqueue_graph_extraction_task(
1632                    "[tool_result: abc123]\nprovider_type = \"claude\"\nallowed_commands = []",
1633                    false,
1634                    true, // has_tool_result_parts
1635                )
1636                .await;
1637
1638            tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1639
1640            let store = GraphStore::new(pool);
1641            let count = store.get_metadata("extraction_count").await.unwrap();
1642            assert!(
1643                count.is_none(),
1644                "tool result message must not trigger graph extraction"
1645            );
1646        }
1647
1648        #[tokio::test]
1649        async fn context_filter_excludes_tool_result_messages() {
1650            // FIX-2: context_messages must not include tool result user messages.
1651            // When enqueue_graph_extraction_task collects context, it filters out
1652            // Role::User messages that contain ToolResult parts — only conversational
1653            // user messages are included as extraction context.
1654            //
1655            // This test verifies the guard fires: a tool result message alone is passed
1656            // (has_tool_result_parts=true) → extraction is skipped entirely, so context
1657            // filtering is not exercised. We verify FIX-2 by ensuring a prior tool result
1658            // message in agent.msg.messages is excluded when a subsequent conversational message
1659            // triggers extraction.
1660            let provider = mock_provider(vec![]);
1661            let mut agent = agent_with_graph(&provider, enabled_graph_config()).await;
1662
1663            // Add a tool result message to the agent's message history — this simulates
1664            // a tool call response that arrived before the current conversational turn.
1665            agent.msg.messages.push(Message {
1666                role: Role::User,
1667                content: "[tool_result: abc]\nprovider_type = \"openai\"".to_owned(),
1668                parts: vec![MessagePart::ToolResult {
1669                    tool_use_id: "abc".to_owned(),
1670                    content: "provider_type = \"openai\"".to_owned(),
1671                    is_error: false,
1672                }],
1673                metadata: MessageMetadata::default(),
1674            });
1675
1676            let pool = agent
1677                .memory_state
1678                .persistence
1679                .memory
1680                .as_ref()
1681                .unwrap()
1682                .sqlite()
1683                .pool()
1684                .clone();
1685
1686            // Trigger extraction for a conversational message (not a tool result).
1687            agent
1688                .enqueue_graph_extraction_task(
1689                    "I prefer Rust for systems programming",
1690                    false,
1691                    false,
1692                )
1693                .await;
1694
1695            tokio::time::sleep(std::time::Duration::from_millis(200)).await;
1696
1697            // Extraction must have fired (conversational message, no injection flags).
1698            let store = GraphStore::new(pool);
1699            let count = store.get_metadata("extraction_count").await.unwrap();
1700            assert!(
1701                count.is_some(),
1702                "conversational message must trigger extraction even with prior tool result in history"
1703            );
1704        }
1705    }
1706
1707    // R-PERS-01: unit tests for enqueue_persona_extraction_task guard conditions.
1708    mod persona_extraction_guards {
1709        use super::*;
1710        use zeph_config::PersonaConfig;
1711        use zeph_llm::provider::MessageMetadata;
1712
1713        fn enabled_persona_config() -> PersonaConfig {
1714            PersonaConfig {
1715                enabled: true,
1716                min_messages: 1,
1717                ..PersonaConfig::default()
1718            }
1719        }
1720
1721        async fn agent_with_persona(
1722            provider: &AnyProvider,
1723            config: PersonaConfig,
1724        ) -> Agent<MockChannel> {
1725            let memory =
1726                test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1727            let cid = memory.sqlite().create_conversation().await.unwrap();
1728            let mut agent = Agent::new(
1729                provider.clone(),
1730                MockChannel::new(vec![]),
1731                create_test_registry(),
1732                None,
1733                5,
1734                MockToolExecutor::no_tools(),
1735            )
1736            .with_memory(std::sync::Arc::new(memory), cid, 50, 5, 100);
1737            agent.memory_state.extraction.persona_config = config;
1738            agent
1739        }
1740
1741        #[tokio::test]
1742        async fn disabled_config_skips_spawn() {
1743            // persona.enabled=false → nothing is spawned; persona_memory stays empty.
1744            let provider = mock_provider(vec![]);
1745            let mut agent = agent_with_persona(
1746                &provider,
1747                PersonaConfig {
1748                    enabled: false,
1749                    ..PersonaConfig::default()
1750                },
1751            )
1752            .await;
1753
1754            // Inject a user message so message count is above threshold.
1755            agent.msg.messages.push(zeph_llm::provider::Message {
1756                role: Role::User,
1757                content: "I prefer Rust for systems programming".to_owned(),
1758                parts: vec![],
1759                metadata: MessageMetadata::default(),
1760            });
1761
1762            agent.enqueue_persona_extraction_task();
1763
1764            let store = agent
1765                .memory_state
1766                .persistence
1767                .memory
1768                .as_ref()
1769                .unwrap()
1770                .sqlite()
1771                .clone();
1772            let count = store.count_persona_facts().await.unwrap();
1773            assert_eq!(count, 0, "disabled persona config must not write any facts");
1774        }
1775
1776        #[tokio::test]
1777        async fn below_min_messages_skips_spawn() {
1778            // min_messages=3 but only 2 user messages → should skip.
1779            let provider = mock_provider(vec![]);
1780            let mut agent = agent_with_persona(
1781                &provider,
1782                PersonaConfig {
1783                    enabled: true,
1784                    min_messages: 3,
1785                    ..PersonaConfig::default()
1786                },
1787            )
1788            .await;
1789
1790            for text in ["I use Rust", "I prefer async code"] {
1791                agent.msg.messages.push(zeph_llm::provider::Message {
1792                    role: Role::User,
1793                    content: text.to_owned(),
1794                    parts: vec![],
1795                    metadata: MessageMetadata::default(),
1796                });
1797            }
1798
1799            agent.enqueue_persona_extraction_task();
1800
1801            let store = agent
1802                .memory_state
1803                .persistence
1804                .memory
1805                .as_ref()
1806                .unwrap()
1807                .sqlite()
1808                .clone();
1809            let count = store.count_persona_facts().await.unwrap();
1810            assert_eq!(
1811                count, 0,
1812                "below min_messages threshold must not trigger extraction"
1813            );
1814        }
1815
1816        #[tokio::test]
1817        async fn no_memory_skips_spawn() {
1818            // memory=None → must exit early without panic.
1819            let provider = mock_provider(vec![]);
1820            let channel = MockChannel::new(vec![]);
1821            let registry = create_test_registry();
1822            let executor = MockToolExecutor::no_tools();
1823            let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
1824            agent.memory_state.extraction.persona_config = enabled_persona_config();
1825            agent.msg.messages.push(zeph_llm::provider::Message {
1826                role: Role::User,
1827                content: "I like Rust".to_owned(),
1828                parts: vec![],
1829                metadata: MessageMetadata::default(),
1830            });
1831
1832            // Must not panic even without memory.
1833            agent.enqueue_persona_extraction_task();
1834        }
1835
1836        #[tokio::test]
1837        async fn enabled_enough_messages_spawns_extraction() {
1838            // enabled=true, min_messages=1, self-referential message → extraction runs eagerly
1839            // (not fire-and-forget) and chat() is called on the provider, verified via MockProvider.
1840            use zeph_llm::mock::MockProvider;
1841            let (mock, recorded) = MockProvider::default().with_recording();
1842            let provider = AnyProvider::Mock(mock);
1843            let mut agent = agent_with_persona(&provider, enabled_persona_config()).await;
1844
1845            agent.msg.messages.push(zeph_llm::provider::Message {
1846                role: Role::User,
1847                content: "I prefer Rust for systems programming".to_owned(),
1848                parts: vec![],
1849                metadata: MessageMetadata::default(),
1850            });
1851
1852            agent.enqueue_persona_extraction_task();
1853
1854            // Persona extraction runs via BackgroundSupervisor. Wait for tasks to complete.
1855            agent.lifecycle.supervisor.join_all_for_test().await;
1856
1857            let calls = recorded.lock().unwrap();
1858            assert!(
1859                !calls.is_empty(),
1860                "happy-path: provider.chat() must be called when extraction completes"
1861            );
1862        }
1863
1864        #[tokio::test]
1865        async fn messages_capped_at_eight() {
1866            // More than 8 user messages → only 8 are passed to extraction.
1867            // Each message contains "I" so self-referential gate passes.
1868            use zeph_llm::mock::MockProvider;
1869            let (mock, recorded) = MockProvider::default().with_recording();
1870            let provider = AnyProvider::Mock(mock);
1871            let mut agent = agent_with_persona(&provider, enabled_persona_config()).await;
1872
1873            for i in 0..12u32 {
1874                agent.msg.messages.push(zeph_llm::provider::Message {
1875                    role: Role::User,
1876                    content: format!("I like message {i}"),
1877                    parts: vec![],
1878                    metadata: MessageMetadata::default(),
1879                });
1880            }
1881
1882            agent.enqueue_persona_extraction_task();
1883
1884            // Allow the background task to complete before asserting.
1885            agent.lifecycle.supervisor.join_all_for_test().await;
1886
1887            // Verify extraction ran (provider was called).
1888            let calls = recorded.lock().unwrap();
1889            assert!(
1890                !calls.is_empty(),
1891                "extraction must run when enough messages present"
1892            );
1893            // Verify the prompt sent to the provider does not contain messages beyond the 8th.
1894            let prompt = &calls[0];
1895            let user_text = prompt
1896                .iter()
1897                .filter(|m| m.role == Role::User)
1898                .map(|m| m.content.as_str())
1899                .collect::<Vec<_>>()
1900                .join(" ");
1901            // Messages 8..11 ("I like message 8".."I like message 11") must not appear.
1902            assert!(
1903                !user_text.contains("I like message 8"),
1904                "message index 8 must be excluded from extraction input"
1905            );
1906        }
1907
1908        #[test]
1909        fn long_message_truncated_at_char_boundary() {
1910            // Directly verify the per-message truncation logic applied in
1911            // enqueue_persona_extraction_task: a content > 2048 bytes must be capped
1912            // to exactly floor_char_boundary(2048).
1913            let long_content = "x".repeat(3000);
1914            let truncated = if long_content.len() > 2048 {
1915                long_content[..long_content.floor_char_boundary(2048)].to_owned()
1916            } else {
1917                long_content.clone()
1918            };
1919            assert_eq!(
1920                truncated.len(),
1921                2048,
1922                "ASCII content must be truncated to exactly 2048 bytes"
1923            );
1924
1925            // Multi-byte boundary: build a string whose char boundary falls before 2048.
1926            let multi = "é".repeat(1500); // each 'é' is 2 bytes → 3000 bytes total
1927            let truncated_multi = if multi.len() > 2048 {
1928                multi[..multi.floor_char_boundary(2048)].to_owned()
1929            } else {
1930                multi.clone()
1931            };
1932            assert!(
1933                truncated_multi.len() <= 2048,
1934                "multi-byte content must not exceed 2048 bytes"
1935            );
1936            assert!(truncated_multi.is_char_boundary(truncated_multi.len()));
1937        }
1938    }
1939
1940    #[tokio::test]
1941    async fn persist_message_user_always_embeds_regardless_of_autosave_flag() {
1942        let provider = mock_provider(vec![]);
1943        let channel = MockChannel::new(vec![]);
1944        let registry = create_test_registry();
1945        let executor = MockToolExecutor::no_tools();
1946
1947        let (tx, rx) = tokio::sync::watch::channel(MetricsSnapshot::default());
1948        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1949        let cid = memory.sqlite().create_conversation().await.unwrap();
1950
1951        // autosave_assistant=false — but User role always takes embedding path
1952        let mut agent = Agent::new(provider, channel, registry, None, 5, executor)
1953            .with_metrics(tx)
1954            .with_memory(std::sync::Arc::new(memory), cid, 50, 5, 100);
1955        agent.memory_state.persistence.autosave_assistant = false;
1956        agent.memory_state.persistence.autosave_min_length = 20;
1957
1958        let long_user_msg = "A".repeat(100);
1959        agent
1960            .persist_message(Role::User, &long_user_msg, &[], false)
1961            .await;
1962
1963        let history = agent
1964            .memory_state
1965            .persistence
1966            .memory
1967            .as_ref()
1968            .unwrap()
1969            .sqlite()
1970            .load_history(cid, 50)
1971            .await
1972            .unwrap();
1973        assert_eq!(history.len(), 1, "user message must be saved");
1974        // User messages go through remember_with_parts (embedding path).
1975        // sqlite_message_count must increment regardless of Qdrant availability.
1976        assert_eq!(rx.borrow().sqlite_message_count, 1);
1977    }
1978
1979    // Round-trip tests: verify that persist_message saves the correct parts and they
1980    // are restored correctly by load_history.
1981
1982    #[tokio::test]
1983    async fn persist_message_saves_correct_tool_use_parts() {
1984        use zeph_llm::provider::MessagePart;
1985
1986        let provider = mock_provider(vec![]);
1987        let channel = MockChannel::new(vec![]);
1988        let registry = create_test_registry();
1989        let executor = MockToolExecutor::no_tools();
1990
1991        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1992        let cid = memory.sqlite().create_conversation().await.unwrap();
1993
1994        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
1995            std::sync::Arc::new(memory),
1996            cid,
1997            50,
1998            5,
1999            100,
2000        );
2001
2002        let parts = vec![MessagePart::ToolUse {
2003            id: "call_abc123".to_string(),
2004            name: "read_file".to_string(),
2005            input: serde_json::json!({"path": "/tmp/test.txt"}),
2006        }];
2007        let content = "[tool_use: read_file(call_abc123)]";
2008
2009        agent
2010            .persist_message(Role::Assistant, content, &parts, false)
2011            .await;
2012
2013        let history = agent
2014            .memory_state
2015            .persistence
2016            .memory
2017            .as_ref()
2018            .unwrap()
2019            .sqlite()
2020            .load_history(cid, 50)
2021            .await
2022            .unwrap();
2023
2024        assert_eq!(history.len(), 1);
2025        assert_eq!(history[0].role, Role::Assistant);
2026        assert_eq!(history[0].content, content);
2027        assert_eq!(history[0].parts.len(), 1);
2028        match &history[0].parts[0] {
2029            MessagePart::ToolUse { id, name, .. } => {
2030                assert_eq!(id, "call_abc123");
2031                assert_eq!(name, "read_file");
2032            }
2033            other => panic!("expected ToolUse part, got {other:?}"),
2034        }
2035        // Regression guard: assistant message must NOT have ToolResult parts
2036        assert!(
2037            !history[0]
2038                .parts
2039                .iter()
2040                .any(|p| matches!(p, MessagePart::ToolResult { .. })),
2041            "assistant message must not contain ToolResult parts"
2042        );
2043    }
2044
2045    #[tokio::test]
2046    async fn persist_message_saves_correct_tool_result_parts() {
2047        use zeph_llm::provider::MessagePart;
2048
2049        let provider = mock_provider(vec![]);
2050        let channel = MockChannel::new(vec![]);
2051        let registry = create_test_registry();
2052        let executor = MockToolExecutor::no_tools();
2053
2054        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2055        let cid = memory.sqlite().create_conversation().await.unwrap();
2056
2057        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2058            std::sync::Arc::new(memory),
2059            cid,
2060            50,
2061            5,
2062            100,
2063        );
2064
2065        let parts = vec![MessagePart::ToolResult {
2066            tool_use_id: "call_abc123".to_string(),
2067            content: "file contents here".to_string(),
2068            is_error: false,
2069        }];
2070        let content = "[tool_result: call_abc123]\nfile contents here";
2071
2072        agent
2073            .persist_message(Role::User, content, &parts, false)
2074            .await;
2075
2076        let history = agent
2077            .memory_state
2078            .persistence
2079            .memory
2080            .as_ref()
2081            .unwrap()
2082            .sqlite()
2083            .load_history(cid, 50)
2084            .await
2085            .unwrap();
2086
2087        assert_eq!(history.len(), 1);
2088        assert_eq!(history[0].role, Role::User);
2089        assert_eq!(history[0].content, content);
2090        assert_eq!(history[0].parts.len(), 1);
2091        match &history[0].parts[0] {
2092            MessagePart::ToolResult {
2093                tool_use_id,
2094                content: result_content,
2095                is_error,
2096            } => {
2097                assert_eq!(tool_use_id, "call_abc123");
2098                assert_eq!(result_content, "file contents here");
2099                assert!(!is_error);
2100            }
2101            other => panic!("expected ToolResult part, got {other:?}"),
2102        }
2103        // Regression guard: user message with ToolResult must NOT have ToolUse parts
2104        assert!(
2105            !history[0]
2106                .parts
2107                .iter()
2108                .any(|p| matches!(p, MessagePart::ToolUse { .. })),
2109            "user ToolResult message must not contain ToolUse parts"
2110        );
2111    }
2112
2113    #[tokio::test]
2114    async fn persist_message_roundtrip_preserves_role_part_alignment() {
2115        use zeph_llm::provider::MessagePart;
2116
2117        let provider = mock_provider(vec![]);
2118        let channel = MockChannel::new(vec![]);
2119        let registry = create_test_registry();
2120        let executor = MockToolExecutor::no_tools();
2121
2122        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2123        let cid = memory.sqlite().create_conversation().await.unwrap();
2124
2125        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2126            std::sync::Arc::new(memory),
2127            cid,
2128            50,
2129            5,
2130            100,
2131        );
2132
2133        // Persist assistant message with ToolUse parts
2134        let assistant_parts = vec![MessagePart::ToolUse {
2135            id: "id_1".to_string(),
2136            name: "list_dir".to_string(),
2137            input: serde_json::json!({"path": "/tmp"}),
2138        }];
2139        agent
2140            .persist_message(
2141                Role::Assistant,
2142                "[tool_use: list_dir(id_1)]",
2143                &assistant_parts,
2144                false,
2145            )
2146            .await;
2147
2148        // Persist user message with ToolResult parts
2149        let user_parts = vec![MessagePart::ToolResult {
2150            tool_use_id: "id_1".to_string(),
2151            content: "file1.txt\nfile2.txt".to_string(),
2152            is_error: false,
2153        }];
2154        agent
2155            .persist_message(
2156                Role::User,
2157                "[tool_result: id_1]\nfile1.txt\nfile2.txt",
2158                &user_parts,
2159                false,
2160            )
2161            .await;
2162
2163        let history = agent
2164            .memory_state
2165            .persistence
2166            .memory
2167            .as_ref()
2168            .unwrap()
2169            .sqlite()
2170            .load_history(cid, 50)
2171            .await
2172            .unwrap();
2173
2174        assert_eq!(history.len(), 2);
2175
2176        // First message: assistant + ToolUse
2177        assert_eq!(history[0].role, Role::Assistant);
2178        assert_eq!(history[0].content, "[tool_use: list_dir(id_1)]");
2179        assert!(
2180            matches!(&history[0].parts[0], MessagePart::ToolUse { id, .. } if id == "id_1"),
2181            "first message must be assistant ToolUse"
2182        );
2183
2184        // Second message: user + ToolResult
2185        assert_eq!(history[1].role, Role::User);
2186        assert_eq!(
2187            history[1].content,
2188            "[tool_result: id_1]\nfile1.txt\nfile2.txt"
2189        );
2190        assert!(
2191            matches!(&history[1].parts[0], MessagePart::ToolResult { tool_use_id, .. } if tool_use_id == "id_1"),
2192            "second message must be user ToolResult"
2193        );
2194
2195        // Cross-role regression guard: no swapped parts
2196        assert!(
2197            !history[0]
2198                .parts
2199                .iter()
2200                .any(|p| matches!(p, MessagePart::ToolResult { .. })),
2201            "assistant message must not have ToolResult parts"
2202        );
2203        assert!(
2204            !history[1]
2205                .parts
2206                .iter()
2207                .any(|p| matches!(p, MessagePart::ToolUse { .. })),
2208            "user message must not have ToolUse parts"
2209        );
2210    }
2211
2212    #[tokio::test]
2213    async fn persist_message_saves_correct_tool_output_parts() {
2214        use zeph_llm::provider::MessagePart;
2215
2216        let provider = mock_provider(vec![]);
2217        let channel = MockChannel::new(vec![]);
2218        let registry = create_test_registry();
2219        let executor = MockToolExecutor::no_tools();
2220
2221        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2222        let cid = memory.sqlite().create_conversation().await.unwrap();
2223
2224        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2225            std::sync::Arc::new(memory),
2226            cid,
2227            50,
2228            5,
2229            100,
2230        );
2231
2232        let parts = vec![MessagePart::ToolOutput {
2233            tool_name: "shell".into(),
2234            body: "hello from shell".to_string(),
2235            compacted_at: None,
2236        }];
2237        let content = "[tool: shell]\nhello from shell";
2238
2239        agent
2240            .persist_message(Role::User, content, &parts, false)
2241            .await;
2242
2243        let history = agent
2244            .memory_state
2245            .persistence
2246            .memory
2247            .as_ref()
2248            .unwrap()
2249            .sqlite()
2250            .load_history(cid, 50)
2251            .await
2252            .unwrap();
2253
2254        assert_eq!(history.len(), 1);
2255        assert_eq!(history[0].role, Role::User);
2256        assert_eq!(history[0].content, content);
2257        assert_eq!(history[0].parts.len(), 1);
2258        match &history[0].parts[0] {
2259            MessagePart::ToolOutput {
2260                tool_name,
2261                body,
2262                compacted_at,
2263            } => {
2264                assert_eq!(tool_name, "shell");
2265                assert_eq!(body, "hello from shell");
2266                assert!(compacted_at.is_none());
2267            }
2268            other => panic!("expected ToolOutput part, got {other:?}"),
2269        }
2270    }
2271
2272    // --- sanitize_tool_pairs unit tests ---
2273
2274    #[tokio::test]
2275    async fn load_history_removes_trailing_orphan_tool_use() {
2276        use zeph_llm::provider::MessagePart;
2277
2278        let provider = mock_provider(vec![]);
2279        let channel = MockChannel::new(vec![]);
2280        let registry = create_test_registry();
2281        let executor = MockToolExecutor::no_tools();
2282
2283        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2284        let cid = memory.sqlite().create_conversation().await.unwrap();
2285        let sqlite = memory.sqlite();
2286
2287        // user message (normal)
2288        sqlite
2289            .save_message(cid, "user", "do something with a tool")
2290            .await
2291            .unwrap();
2292
2293        // assistant message with ToolUse parts — no following tool_result (orphan)
2294        let parts = serde_json::to_string(&[MessagePart::ToolUse {
2295            id: "call_orphan".to_string(),
2296            name: "shell".to_string(),
2297            input: serde_json::json!({"command": "ls"}),
2298        }])
2299        .unwrap();
2300        sqlite
2301            .save_message_with_parts(cid, "assistant", "[tool_use: shell(call_orphan)]", &parts)
2302            .await
2303            .unwrap();
2304
2305        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2306            std::sync::Arc::new(memory),
2307            cid,
2308            50,
2309            5,
2310            100,
2311        );
2312
2313        let messages_before = agent.msg.messages.len();
2314        agent.load_history().await.unwrap();
2315
2316        // Only the user message should be loaded; orphaned assistant tool_use removed.
2317        assert_eq!(
2318            agent.msg.messages.len(),
2319            messages_before + 1,
2320            "orphaned trailing tool_use must be removed"
2321        );
2322        assert_eq!(agent.msg.messages.last().unwrap().role, Role::User);
2323    }
2324
2325    #[tokio::test]
2326    async fn load_history_removes_leading_orphan_tool_result() {
2327        use zeph_llm::provider::MessagePart;
2328
2329        let provider = mock_provider(vec![]);
2330        let channel = MockChannel::new(vec![]);
2331        let registry = create_test_registry();
2332        let executor = MockToolExecutor::no_tools();
2333
2334        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2335        let cid = memory.sqlite().create_conversation().await.unwrap();
2336        let sqlite = memory.sqlite();
2337
2338        // Leading orphan: user message with ToolResult but no preceding tool_use
2339        let result_parts = serde_json::to_string(&[MessagePart::ToolResult {
2340            tool_use_id: "call_missing".to_string(),
2341            content: "result data".to_string(),
2342            is_error: false,
2343        }])
2344        .unwrap();
2345        sqlite
2346            .save_message_with_parts(
2347                cid,
2348                "user",
2349                "[tool_result: call_missing]\nresult data",
2350                &result_parts,
2351            )
2352            .await
2353            .unwrap();
2354
2355        // A valid assistant reply after the orphan
2356        sqlite
2357            .save_message(cid, "assistant", "here is my response")
2358            .await
2359            .unwrap();
2360
2361        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2362            std::sync::Arc::new(memory),
2363            cid,
2364            50,
2365            5,
2366            100,
2367        );
2368
2369        let messages_before = agent.msg.messages.len();
2370        agent.load_history().await.unwrap();
2371
2372        // Orphaned leading tool_result removed; only assistant message kept.
2373        assert_eq!(
2374            agent.msg.messages.len(),
2375            messages_before + 1,
2376            "orphaned leading tool_result must be removed"
2377        );
2378        assert_eq!(agent.msg.messages.last().unwrap().role, Role::Assistant);
2379    }
2380
2381    #[tokio::test]
2382    async fn load_history_preserves_complete_tool_pairs() {
2383        use zeph_llm::provider::MessagePart;
2384
2385        let provider = mock_provider(vec![]);
2386        let channel = MockChannel::new(vec![]);
2387        let registry = create_test_registry();
2388        let executor = MockToolExecutor::no_tools();
2389
2390        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2391        let cid = memory.sqlite().create_conversation().await.unwrap();
2392        let sqlite = memory.sqlite();
2393
2394        // Complete tool_use / tool_result pair
2395        let use_parts = serde_json::to_string(&[MessagePart::ToolUse {
2396            id: "call_ok".to_string(),
2397            name: "shell".to_string(),
2398            input: serde_json::json!({"command": "pwd"}),
2399        }])
2400        .unwrap();
2401        sqlite
2402            .save_message_with_parts(cid, "assistant", "[tool_use: shell(call_ok)]", &use_parts)
2403            .await
2404            .unwrap();
2405
2406        let result_parts = serde_json::to_string(&[MessagePart::ToolResult {
2407            tool_use_id: "call_ok".to_string(),
2408            content: "/home/user".to_string(),
2409            is_error: false,
2410        }])
2411        .unwrap();
2412        sqlite
2413            .save_message_with_parts(
2414                cid,
2415                "user",
2416                "[tool_result: call_ok]\n/home/user",
2417                &result_parts,
2418            )
2419            .await
2420            .unwrap();
2421
2422        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2423            std::sync::Arc::new(memory),
2424            cid,
2425            50,
2426            5,
2427            100,
2428        );
2429
2430        let messages_before = agent.msg.messages.len();
2431        agent.load_history().await.unwrap();
2432
2433        // Both messages must be preserved.
2434        assert_eq!(
2435            agent.msg.messages.len(),
2436            messages_before + 2,
2437            "complete tool_use/tool_result pair must be preserved"
2438        );
2439        assert_eq!(agent.msg.messages[messages_before].role, Role::Assistant);
2440        assert_eq!(agent.msg.messages[messages_before + 1].role, Role::User);
2441    }
2442
2443    #[tokio::test]
2444    async fn load_history_handles_multiple_trailing_orphans() {
2445        use zeph_llm::provider::MessagePart;
2446
2447        let provider = mock_provider(vec![]);
2448        let channel = MockChannel::new(vec![]);
2449        let registry = create_test_registry();
2450        let executor = MockToolExecutor::no_tools();
2451
2452        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2453        let cid = memory.sqlite().create_conversation().await.unwrap();
2454        let sqlite = memory.sqlite();
2455
2456        // Normal user message
2457        sqlite.save_message(cid, "user", "start").await.unwrap();
2458
2459        // First orphaned tool_use
2460        let parts1 = serde_json::to_string(&[MessagePart::ToolUse {
2461            id: "call_1".to_string(),
2462            name: "shell".to_string(),
2463            input: serde_json::json!({}),
2464        }])
2465        .unwrap();
2466        sqlite
2467            .save_message_with_parts(cid, "assistant", "[tool_use: shell(call_1)]", &parts1)
2468            .await
2469            .unwrap();
2470
2471        // Second orphaned tool_use (consecutive, no tool_result between them)
2472        let parts2 = serde_json::to_string(&[MessagePart::ToolUse {
2473            id: "call_2".to_string(),
2474            name: "read_file".to_string(),
2475            input: serde_json::json!({}),
2476        }])
2477        .unwrap();
2478        sqlite
2479            .save_message_with_parts(cid, "assistant", "[tool_use: read_file(call_2)]", &parts2)
2480            .await
2481            .unwrap();
2482
2483        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2484            std::sync::Arc::new(memory),
2485            cid,
2486            50,
2487            5,
2488            100,
2489        );
2490
2491        let messages_before = agent.msg.messages.len();
2492        agent.load_history().await.unwrap();
2493
2494        // Both orphaned tool_use messages removed; only the user message kept.
2495        assert_eq!(
2496            agent.msg.messages.len(),
2497            messages_before + 1,
2498            "all trailing orphaned tool_use messages must be removed"
2499        );
2500        assert_eq!(agent.msg.messages.last().unwrap().role, Role::User);
2501    }
2502
2503    #[tokio::test]
2504    async fn load_history_no_tool_messages_unchanged() {
2505        let provider = mock_provider(vec![]);
2506        let channel = MockChannel::new(vec![]);
2507        let registry = create_test_registry();
2508        let executor = MockToolExecutor::no_tools();
2509
2510        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2511        let cid = memory.sqlite().create_conversation().await.unwrap();
2512        let sqlite = memory.sqlite();
2513
2514        sqlite.save_message(cid, "user", "hello").await.unwrap();
2515        sqlite
2516            .save_message(cid, "assistant", "hi there")
2517            .await
2518            .unwrap();
2519        sqlite
2520            .save_message(cid, "user", "how are you?")
2521            .await
2522            .unwrap();
2523
2524        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2525            std::sync::Arc::new(memory),
2526            cid,
2527            50,
2528            5,
2529            100,
2530        );
2531
2532        let messages_before = agent.msg.messages.len();
2533        agent.load_history().await.unwrap();
2534
2535        // All three plain messages must be preserved.
2536        assert_eq!(
2537            agent.msg.messages.len(),
2538            messages_before + 3,
2539            "plain messages without tool parts must pass through unchanged"
2540        );
2541    }
2542
2543    #[tokio::test]
2544    async fn load_history_removes_both_leading_and_trailing_orphans() {
2545        use zeph_llm::provider::MessagePart;
2546
2547        let provider = mock_provider(vec![]);
2548        let channel = MockChannel::new(vec![]);
2549        let registry = create_test_registry();
2550        let executor = MockToolExecutor::no_tools();
2551
2552        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2553        let cid = memory.sqlite().create_conversation().await.unwrap();
2554        let sqlite = memory.sqlite();
2555
2556        // Leading orphan: user message with ToolResult, no preceding tool_use
2557        let result_parts = serde_json::to_string(&[MessagePart::ToolResult {
2558            tool_use_id: "call_leading".to_string(),
2559            content: "orphaned result".to_string(),
2560            is_error: false,
2561        }])
2562        .unwrap();
2563        sqlite
2564            .save_message_with_parts(
2565                cid,
2566                "user",
2567                "[tool_result: call_leading]\norphaned result",
2568                &result_parts,
2569            )
2570            .await
2571            .unwrap();
2572
2573        // Valid middle messages
2574        sqlite
2575            .save_message(cid, "user", "what is 2+2?")
2576            .await
2577            .unwrap();
2578        sqlite.save_message(cid, "assistant", "4").await.unwrap();
2579
2580        // Trailing orphan: assistant message with ToolUse, no following tool_result
2581        let use_parts = serde_json::to_string(&[MessagePart::ToolUse {
2582            id: "call_trailing".to_string(),
2583            name: "shell".to_string(),
2584            input: serde_json::json!({"command": "date"}),
2585        }])
2586        .unwrap();
2587        sqlite
2588            .save_message_with_parts(
2589                cid,
2590                "assistant",
2591                "[tool_use: shell(call_trailing)]",
2592                &use_parts,
2593            )
2594            .await
2595            .unwrap();
2596
2597        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2598            std::sync::Arc::new(memory),
2599            cid,
2600            50,
2601            5,
2602            100,
2603        );
2604
2605        let messages_before = agent.msg.messages.len();
2606        agent.load_history().await.unwrap();
2607
2608        // Both orphans removed; only the 2 valid middle messages kept.
2609        assert_eq!(
2610            agent.msg.messages.len(),
2611            messages_before + 2,
2612            "both leading and trailing orphans must be removed"
2613        );
2614        assert_eq!(agent.msg.messages[messages_before].role, Role::User);
2615        assert_eq!(agent.msg.messages[messages_before].content, "what is 2+2?");
2616        assert_eq!(
2617            agent.msg.messages[messages_before + 1].role,
2618            Role::Assistant
2619        );
2620        assert_eq!(agent.msg.messages[messages_before + 1].content, "4");
2621    }
2622
2623    /// RC1 regression: mid-history assistant[`ToolUse`] without a following user[`ToolResult`]
2624    /// must have its `ToolUse` parts stripped (text preserved). The message count stays the same
2625    /// because the assistant message has a text content fallback; only `ToolUse` parts are
2626    /// removed.
2627    #[tokio::test]
2628    async fn sanitize_tool_pairs_strips_mid_history_orphan_tool_use() {
2629        use zeph_llm::provider::MessagePart;
2630
2631        let provider = mock_provider(vec![]);
2632        let channel = MockChannel::new(vec![]);
2633        let registry = create_test_registry();
2634        let executor = MockToolExecutor::no_tools();
2635
2636        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2637        let cid = memory.sqlite().create_conversation().await.unwrap();
2638        let sqlite = memory.sqlite();
2639
2640        // Normal first exchange.
2641        sqlite
2642            .save_message(cid, "user", "first question")
2643            .await
2644            .unwrap();
2645        sqlite
2646            .save_message(cid, "assistant", "first answer")
2647            .await
2648            .unwrap();
2649
2650        // Mid-history orphan: assistant with ToolUse but NO following ToolResult user message.
2651        // This models the compaction-split scenario (RC2) where replace_conversation hid the
2652        // user[ToolResult] but left the assistant[ToolUse] visible.
2653        let use_parts = serde_json::to_string(&[
2654            MessagePart::ToolUse {
2655                id: "call_mid_1".to_string(),
2656                name: "shell".to_string(),
2657                input: serde_json::json!({"command": "ls"}),
2658            },
2659            MessagePart::Text {
2660                text: "Let me check the files.".to_string(),
2661            },
2662        ])
2663        .unwrap();
2664        sqlite
2665            .save_message_with_parts(cid, "assistant", "Let me check the files.", &use_parts)
2666            .await
2667            .unwrap();
2668
2669        // Another normal exchange after the orphan.
2670        sqlite
2671            .save_message(cid, "user", "second question")
2672            .await
2673            .unwrap();
2674        sqlite
2675            .save_message(cid, "assistant", "second answer")
2676            .await
2677            .unwrap();
2678
2679        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2680            std::sync::Arc::new(memory),
2681            cid,
2682            50,
2683            5,
2684            100,
2685        );
2686
2687        let messages_before = agent.msg.messages.len();
2688        agent.load_history().await.unwrap();
2689
2690        // All 5 messages remain (orphan message kept because it has text), but the orphaned
2691        // message must have its ToolUse parts stripped.
2692        assert_eq!(
2693            agent.msg.messages.len(),
2694            messages_before + 5,
2695            "message count must be 5 (orphan message kept — has text content)"
2696        );
2697
2698        // The orphaned assistant message (index 2 in the loaded slice) must have no ToolUse parts.
2699        let orphan = &agent.msg.messages[messages_before + 2];
2700        assert_eq!(orphan.role, Role::Assistant);
2701        assert!(
2702            !orphan
2703                .parts
2704                .iter()
2705                .any(|p| matches!(p, MessagePart::ToolUse { .. })),
2706            "orphaned ToolUse parts must be stripped from mid-history message"
2707        );
2708        // Text part must be preserved.
2709        assert!(
2710            orphan.parts.iter().any(
2711                |p| matches!(p, MessagePart::Text { text } if text == "Let me check the files.")
2712            ),
2713            "text content of orphaned assistant message must be preserved"
2714        );
2715    }
2716
2717    /// RC3 regression: a user message with empty `content` but non-empty `parts` (`ToolResult`)
2718    /// must NOT be skipped by `load_history`. Previously the empty-content check dropped these
2719    /// messages before `sanitize_tool_pairs` ran, leaving the preceding assistant `ToolUse`
2720    /// orphaned.
2721    #[tokio::test]
2722    async fn load_history_keeps_tool_only_user_message() {
2723        use zeph_llm::provider::MessagePart;
2724
2725        let provider = mock_provider(vec![]);
2726        let channel = MockChannel::new(vec![]);
2727        let registry = create_test_registry();
2728        let executor = MockToolExecutor::no_tools();
2729
2730        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2731        let cid = memory.sqlite().create_conversation().await.unwrap();
2732        let sqlite = memory.sqlite();
2733
2734        // Assistant sends a ToolUse.
2735        let use_parts = serde_json::to_string(&[MessagePart::ToolUse {
2736            id: "call_rc3".to_string(),
2737            name: "memory_save".to_string(),
2738            input: serde_json::json!({"content": "something"}),
2739        }])
2740        .unwrap();
2741        sqlite
2742            .save_message_with_parts(cid, "assistant", "[tool_use: memory_save]", &use_parts)
2743            .await
2744            .unwrap();
2745
2746        // User message has empty text content but carries ToolResult in parts — native tool pattern.
2747        let result_parts = serde_json::to_string(&[MessagePart::ToolResult {
2748            tool_use_id: "call_rc3".to_string(),
2749            content: "saved".to_string(),
2750            is_error: false,
2751        }])
2752        .unwrap();
2753        sqlite
2754            .save_message_with_parts(cid, "user", "", &result_parts)
2755            .await
2756            .unwrap();
2757
2758        sqlite.save_message(cid, "assistant", "done").await.unwrap();
2759
2760        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2761            std::sync::Arc::new(memory),
2762            cid,
2763            50,
2764            5,
2765            100,
2766        );
2767
2768        let messages_before = agent.msg.messages.len();
2769        agent.load_history().await.unwrap();
2770
2771        // All 3 messages must be loaded — the empty-content ToolResult user message must NOT be
2772        // dropped.
2773        assert_eq!(
2774            agent.msg.messages.len(),
2775            messages_before + 3,
2776            "user message with empty content but ToolResult parts must not be dropped"
2777        );
2778
2779        // The user message at index 1 must still carry the ToolResult part.
2780        let user_msg = &agent.msg.messages[messages_before + 1];
2781        assert_eq!(user_msg.role, Role::User);
2782        assert!(
2783            user_msg.parts.iter().any(
2784                |p| matches!(p, MessagePart::ToolResult { tool_use_id, .. } if tool_use_id == "call_rc3")
2785            ),
2786            "ToolResult part must be preserved on user message with empty content"
2787        );
2788    }
2789
2790    /// RC2 reverse pass: a user message with a `ToolResult` whose `tool_use_id` has no matching
2791    /// `ToolUse` in the preceding assistant message must be stripped by
2792    /// `strip_mid_history_orphans`.
2793    #[tokio::test]
2794    async fn strip_orphans_removes_orphaned_tool_result() {
2795        use zeph_llm::provider::MessagePart;
2796
2797        let provider = mock_provider(vec![]);
2798        let channel = MockChannel::new(vec![]);
2799        let registry = create_test_registry();
2800        let executor = MockToolExecutor::no_tools();
2801
2802        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2803        let cid = memory.sqlite().create_conversation().await.unwrap();
2804        let sqlite = memory.sqlite();
2805
2806        // Normal exchange before the orphan.
2807        sqlite.save_message(cid, "user", "hello").await.unwrap();
2808        sqlite.save_message(cid, "assistant", "hi").await.unwrap();
2809
2810        // Assistant message that does NOT contain a ToolUse.
2811        sqlite
2812            .save_message(cid, "assistant", "plain answer")
2813            .await
2814            .unwrap();
2815
2816        // User message references a tool_use_id that was never sent by the preceding assistant.
2817        let orphan_result_parts = serde_json::to_string(&[MessagePart::ToolResult {
2818            tool_use_id: "call_nonexistent".to_string(),
2819            content: "stale result".to_string(),
2820            is_error: false,
2821        }])
2822        .unwrap();
2823        sqlite
2824            .save_message_with_parts(
2825                cid,
2826                "user",
2827                "[tool_result: call_nonexistent]\nstale result",
2828                &orphan_result_parts,
2829            )
2830            .await
2831            .unwrap();
2832
2833        sqlite
2834            .save_message(cid, "assistant", "final")
2835            .await
2836            .unwrap();
2837
2838        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2839            std::sync::Arc::new(memory),
2840            cid,
2841            50,
2842            5,
2843            100,
2844        );
2845
2846        let messages_before = agent.msg.messages.len();
2847        agent.load_history().await.unwrap();
2848
2849        // The orphaned ToolResult part must have been stripped from the user message.
2850        // The user message itself may be removed (parts empty + content non-empty) or kept with
2851        // the text content — but it must NOT retain the orphaned ToolResult part.
2852        let loaded = &agent.msg.messages[messages_before..];
2853        for msg in loaded {
2854            assert!(
2855                !msg.parts.iter().any(|p| matches!(
2856                    p,
2857                    MessagePart::ToolResult { tool_use_id, .. } if tool_use_id == "call_nonexistent"
2858                )),
2859                "orphaned ToolResult part must be stripped from history"
2860            );
2861        }
2862    }
2863
2864    /// RC2 reverse pass: a complete `tool_use` + `tool_result` pair must pass through the reverse
2865    /// orphan check intact; the fix must not strip valid `ToolResult` parts.
2866    #[tokio::test]
2867    async fn strip_orphans_keeps_complete_pair() {
2868        use zeph_llm::provider::MessagePart;
2869
2870        let provider = mock_provider(vec![]);
2871        let channel = MockChannel::new(vec![]);
2872        let registry = create_test_registry();
2873        let executor = MockToolExecutor::no_tools();
2874
2875        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2876        let cid = memory.sqlite().create_conversation().await.unwrap();
2877        let sqlite = memory.sqlite();
2878
2879        let use_parts = serde_json::to_string(&[MessagePart::ToolUse {
2880            id: "call_valid".to_string(),
2881            name: "shell".to_string(),
2882            input: serde_json::json!({"command": "ls"}),
2883        }])
2884        .unwrap();
2885        sqlite
2886            .save_message_with_parts(cid, "assistant", "[tool_use: shell]", &use_parts)
2887            .await
2888            .unwrap();
2889
2890        let result_parts = serde_json::to_string(&[MessagePart::ToolResult {
2891            tool_use_id: "call_valid".to_string(),
2892            content: "file.rs".to_string(),
2893            is_error: false,
2894        }])
2895        .unwrap();
2896        sqlite
2897            .save_message_with_parts(cid, "user", "", &result_parts)
2898            .await
2899            .unwrap();
2900
2901        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2902            std::sync::Arc::new(memory),
2903            cid,
2904            50,
2905            5,
2906            100,
2907        );
2908
2909        let messages_before = agent.msg.messages.len();
2910        agent.load_history().await.unwrap();
2911
2912        assert_eq!(
2913            agent.msg.messages.len(),
2914            messages_before + 2,
2915            "complete tool_use/tool_result pair must be preserved"
2916        );
2917
2918        let user_msg = &agent.msg.messages[messages_before + 1];
2919        assert!(
2920            user_msg.parts.iter().any(|p| matches!(
2921                p,
2922                MessagePart::ToolResult { tool_use_id, .. } if tool_use_id == "call_valid"
2923            )),
2924            "ToolResult part for a matched tool_use must not be stripped"
2925        );
2926    }
2927
2928    /// RC2 reverse pass: history with a mix of complete pairs and orphaned `ToolResult` messages.
2929    /// Orphaned `ToolResult` parts must be stripped; complete pairs must pass through intact.
2930    #[tokio::test]
2931    async fn strip_orphans_mixed_history() {
2932        use zeph_llm::provider::MessagePart;
2933
2934        let provider = mock_provider(vec![]);
2935        let channel = MockChannel::new(vec![]);
2936        let registry = create_test_registry();
2937        let executor = MockToolExecutor::no_tools();
2938
2939        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2940        let cid = memory.sqlite().create_conversation().await.unwrap();
2941        let sqlite = memory.sqlite();
2942
2943        // First: complete tool_use / tool_result pair.
2944        let use_parts_ok = serde_json::to_string(&[MessagePart::ToolUse {
2945            id: "call_good".to_string(),
2946            name: "shell".to_string(),
2947            input: serde_json::json!({"command": "pwd"}),
2948        }])
2949        .unwrap();
2950        sqlite
2951            .save_message_with_parts(cid, "assistant", "[tool_use: shell]", &use_parts_ok)
2952            .await
2953            .unwrap();
2954
2955        let result_parts_ok = serde_json::to_string(&[MessagePart::ToolResult {
2956            tool_use_id: "call_good".to_string(),
2957            content: "/home".to_string(),
2958            is_error: false,
2959        }])
2960        .unwrap();
2961        sqlite
2962            .save_message_with_parts(cid, "user", "", &result_parts_ok)
2963            .await
2964            .unwrap();
2965
2966        // Second: plain assistant message followed by an orphaned ToolResult user message.
2967        sqlite
2968            .save_message(cid, "assistant", "text only")
2969            .await
2970            .unwrap();
2971
2972        let orphan_parts = serde_json::to_string(&[MessagePart::ToolResult {
2973            tool_use_id: "call_ghost".to_string(),
2974            content: "ghost result".to_string(),
2975            is_error: false,
2976        }])
2977        .unwrap();
2978        sqlite
2979            .save_message_with_parts(
2980                cid,
2981                "user",
2982                "[tool_result: call_ghost]\nghost result",
2983                &orphan_parts,
2984            )
2985            .await
2986            .unwrap();
2987
2988        sqlite
2989            .save_message(cid, "assistant", "final reply")
2990            .await
2991            .unwrap();
2992
2993        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2994            std::sync::Arc::new(memory),
2995            cid,
2996            50,
2997            5,
2998            100,
2999        );
3000
3001        let messages_before = agent.msg.messages.len();
3002        agent.load_history().await.unwrap();
3003
3004        let loaded = &agent.msg.messages[messages_before..];
3005
3006        // The orphaned ToolResult part must not appear in any message.
3007        for msg in loaded {
3008            assert!(
3009                !msg.parts.iter().any(|p| matches!(
3010                    p,
3011                    MessagePart::ToolResult { tool_use_id, .. } if tool_use_id == "call_ghost"
3012                )),
3013                "orphaned ToolResult (call_ghost) must be stripped from history"
3014            );
3015        }
3016
3017        // The matched ToolResult must still be present on the user message following the
3018        // first assistant message.
3019        let has_good_result = loaded.iter().any(|msg| {
3020            msg.role == Role::User
3021                && msg.parts.iter().any(|p| {
3022                    matches!(
3023                        p,
3024                        MessagePart::ToolResult { tool_use_id, .. } if tool_use_id == "call_good"
3025                    )
3026                })
3027        });
3028        assert!(
3029            has_good_result,
3030            "matched ToolResult (call_good) must be preserved in history"
3031        );
3032    }
3033
3034    /// Regression: a properly matched `tool_use`/`tool_result` pair must NOT be touched by the
3035    /// mid-history scan — ensures the fix doesn't break valid tool exchanges.
3036    #[tokio::test]
3037    async fn sanitize_tool_pairs_preserves_matched_tool_pair() {
3038        use zeph_llm::provider::MessagePart;
3039
3040        let provider = mock_provider(vec![]);
3041        let channel = MockChannel::new(vec![]);
3042        let registry = create_test_registry();
3043        let executor = MockToolExecutor::no_tools();
3044
3045        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
3046        let cid = memory.sqlite().create_conversation().await.unwrap();
3047        let sqlite = memory.sqlite();
3048
3049        sqlite
3050            .save_message(cid, "user", "run a command")
3051            .await
3052            .unwrap();
3053
3054        // Assistant sends a ToolUse.
3055        let use_parts = serde_json::to_string(&[MessagePart::ToolUse {
3056            id: "call_ok".to_string(),
3057            name: "shell".to_string(),
3058            input: serde_json::json!({"command": "echo hi"}),
3059        }])
3060        .unwrap();
3061        sqlite
3062            .save_message_with_parts(cid, "assistant", "[tool_use: shell]", &use_parts)
3063            .await
3064            .unwrap();
3065
3066        // Matching user ToolResult follows.
3067        let result_parts = serde_json::to_string(&[MessagePart::ToolResult {
3068            tool_use_id: "call_ok".to_string(),
3069            content: "hi".to_string(),
3070            is_error: false,
3071        }])
3072        .unwrap();
3073        sqlite
3074            .save_message_with_parts(cid, "user", "[tool_result: call_ok]\nhi", &result_parts)
3075            .await
3076            .unwrap();
3077
3078        sqlite.save_message(cid, "assistant", "done").await.unwrap();
3079
3080        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
3081            std::sync::Arc::new(memory),
3082            cid,
3083            50,
3084            5,
3085            100,
3086        );
3087
3088        let messages_before = agent.msg.messages.len();
3089        agent.load_history().await.unwrap();
3090
3091        // All 4 messages preserved, tool_use parts intact.
3092        assert_eq!(
3093            agent.msg.messages.len(),
3094            messages_before + 4,
3095            "matched tool pair must not be removed"
3096        );
3097        let tool_msg = &agent.msg.messages[messages_before + 1];
3098        assert!(
3099            tool_msg
3100                .parts
3101                .iter()
3102                .any(|p| matches!(p, MessagePart::ToolUse { id, .. } if id == "call_ok")),
3103            "matched ToolUse parts must be preserved"
3104        );
3105    }
3106
3107    /// RC5: `persist_cancelled_tool_results` must persist a tombstone user message containing
3108    /// `is_error=true` `ToolResult` parts for all `tool_calls` IDs so the preceding assistant
3109    /// `ToolUse` is never orphaned in the DB after a cancellation.
3110    #[tokio::test]
3111    async fn persist_cancelled_tool_results_pairs_tool_use() {
3112        use zeph_llm::provider::MessagePart;
3113
3114        let provider = mock_provider(vec![]);
3115        let channel = MockChannel::new(vec![]);
3116        let registry = create_test_registry();
3117        let executor = MockToolExecutor::no_tools();
3118
3119        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
3120        let cid = memory.sqlite().create_conversation().await.unwrap();
3121
3122        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
3123            std::sync::Arc::new(memory),
3124            cid,
3125            50,
3126            5,
3127            100,
3128        );
3129
3130        // Simulate: assistant message with two ToolUse parts already persisted.
3131        let tool_calls = vec![
3132            zeph_llm::provider::ToolUseRequest {
3133                id: "cancel_id_1".to_string(),
3134                name: "shell".to_string().into(),
3135                input: serde_json::json!({}),
3136            },
3137            zeph_llm::provider::ToolUseRequest {
3138                id: "cancel_id_2".to_string(),
3139                name: "read_file".to_string().into(),
3140                input: serde_json::json!({}),
3141            },
3142        ];
3143
3144        agent.persist_cancelled_tool_results(&tool_calls).await;
3145
3146        let history = agent
3147            .memory_state
3148            .persistence
3149            .memory
3150            .as_ref()
3151            .unwrap()
3152            .sqlite()
3153            .load_history(cid, 50)
3154            .await
3155            .unwrap();
3156
3157        // Exactly one user message must have been persisted.
3158        assert_eq!(history.len(), 1);
3159        assert_eq!(history[0].role, Role::User);
3160
3161        // It must contain is_error=true ToolResult for each tool call ID.
3162        for tc in &tool_calls {
3163            assert!(
3164                history[0].parts.iter().any(|p| matches!(
3165                    p,
3166                    MessagePart::ToolResult { tool_use_id, is_error, .. }
3167                        if tool_use_id == &tc.id && *is_error
3168                )),
3169                "tombstone ToolResult for {} must be present and is_error=true",
3170                tc.id
3171            );
3172        }
3173    }
3174
3175    // ---- has_meaningful_content unit tests ----
3176
3177    #[test]
3178    fn meaningful_content_empty_string() {
3179        assert!(!has_meaningful_content(""));
3180    }
3181
3182    #[test]
3183    fn meaningful_content_whitespace_only() {
3184        assert!(!has_meaningful_content("   \n\t  "));
3185    }
3186
3187    #[test]
3188    fn meaningful_content_tool_use_only() {
3189        assert!(!has_meaningful_content("[tool_use: shell(call_1)]"));
3190    }
3191
3192    #[test]
3193    fn meaningful_content_tool_use_no_parens() {
3194        // Format produced when tool_use is stored without explicit id parens.
3195        assert!(!has_meaningful_content("[tool_use: memory_save]"));
3196    }
3197
3198    #[test]
3199    fn meaningful_content_tool_result_with_body() {
3200        assert!(!has_meaningful_content(
3201            "[tool_result: call_1]\nsome output here"
3202        ));
3203    }
3204
3205    #[test]
3206    fn meaningful_content_tool_result_empty_body() {
3207        assert!(!has_meaningful_content("[tool_result: call_1]\n"));
3208    }
3209
3210    #[test]
3211    fn meaningful_content_tool_output_inline() {
3212        assert!(!has_meaningful_content("[tool output: bash] some result"));
3213    }
3214
3215    #[test]
3216    fn meaningful_content_tool_output_pruned() {
3217        assert!(!has_meaningful_content("[tool output: bash] (pruned)"));
3218    }
3219
3220    #[test]
3221    fn meaningful_content_tool_output_fenced() {
3222        assert!(!has_meaningful_content(
3223            "[tool output: bash]\n```\nls output\n```"
3224        ));
3225    }
3226
3227    #[test]
3228    fn meaningful_content_multiple_tool_use_tags() {
3229        assert!(!has_meaningful_content(
3230            "[tool_use: bash(id1)][tool_use: read(id2)]"
3231        ));
3232    }
3233
3234    #[test]
3235    fn meaningful_content_multiple_tool_use_tags_space_separator() {
3236        // Space between tags is not meaningful content.
3237        assert!(!has_meaningful_content(
3238            "[tool_use: bash(id1)] [tool_use: read(id2)]"
3239        ));
3240    }
3241
3242    #[test]
3243    fn meaningful_content_multiple_tool_use_tags_newline_separator() {
3244        // Newline-only separator is also not meaningful.
3245        assert!(!has_meaningful_content(
3246            "[tool_use: bash(id1)]\n[tool_use: read(id2)]"
3247        ));
3248    }
3249
3250    #[test]
3251    fn meaningful_content_tool_result_followed_by_tool_use() {
3252        // Two tags in sequence — no real text between them.
3253        assert!(!has_meaningful_content(
3254            "[tool_result: call_1]\nresult\n[tool_use: bash(call_2)]"
3255        ));
3256    }
3257
3258    #[test]
3259    fn meaningful_content_real_text_only() {
3260        assert!(has_meaningful_content("Hello, how can I help you?"));
3261    }
3262
3263    #[test]
3264    fn meaningful_content_text_before_tool_tag() {
3265        assert!(has_meaningful_content("Let me check. [tool_use: bash(id)]"));
3266    }
3267
3268    #[test]
3269    fn meaningful_content_text_after_tool_use_tag() {
3270        // Text appearing after a [tool_use: name] tag (without parens) is a JSON body
3271        // in the request-builder format — but since that format never reaches the DB,
3272        // this test verifies conservative behavior: the helper returns true (do not delete).
3273        assert!(has_meaningful_content("[tool_use: bash] I ran the command"));
3274    }
3275
3276    #[test]
3277    fn meaningful_content_text_between_tags() {
3278        assert!(has_meaningful_content(
3279            "[tool_use: bash(id1)]\nand then\n[tool_use: read(id2)]"
3280        ));
3281    }
3282
3283    #[test]
3284    fn meaningful_content_malformed_tag_no_closing_bracket() {
3285        // Conservative: malformed tag must not trigger delete.
3286        assert!(has_meaningful_content("[tool_use: "));
3287    }
3288
3289    #[test]
3290    fn meaningful_content_tool_use_and_tool_result_only() {
3291        // Typical persisted assistant+user pair content with no extra text.
3292        assert!(!has_meaningful_content(
3293            "[tool_use: memory_save(call_abc)]\n[tool_result: call_abc]\nsaved"
3294        ));
3295    }
3296
3297    #[test]
3298    fn meaningful_content_tool_result_body_with_json_array() {
3299        assert!(!has_meaningful_content(
3300            "[tool_result: id1]\n[\"array\", \"value\"]"
3301        ));
3302    }
3303
3304    // ---- Integration tests for the #2529 fix: soft-delete of legacy-content orphans ----
3305
3306    /// #2529 regression: orphaned assistant `ToolUse` + user `ToolResult` pair where BOTH messages
3307    /// have content consisting solely of legacy tool bracket strings (no human-readable text).
3308    ///
3309    /// Before the fix, `content.trim().is_empty()` returned false for these messages, so they
3310    /// were never soft-deleted and the WARN log repeated on every session restart.
3311    ///
3312    /// After the fix, `has_meaningful_content` returns false for legacy-only content, so both
3313    /// orphaned messages are soft-deleted (non-null `deleted_at`) in `SQLite`.
3314    #[tokio::test]
3315    async fn issue_2529_orphaned_legacy_content_pair_is_soft_deleted() {
3316        use zeph_llm::provider::MessagePart;
3317
3318        let provider = mock_provider(vec![]);
3319        let channel = MockChannel::new(vec![]);
3320        let registry = create_test_registry();
3321        let executor = MockToolExecutor::no_tools();
3322
3323        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
3324        let cid = memory.sqlite().create_conversation().await.unwrap();
3325        let sqlite = memory.sqlite();
3326
3327        // A normal user message that anchors the conversation.
3328        sqlite
3329            .save_message(cid, "user", "save this for me")
3330            .await
3331            .unwrap();
3332
3333        // Orphaned assistant[ToolUse]: content is ONLY a legacy tool tag — no matching
3334        // ToolResult follows. This is the exact pattern that triggered #2529.
3335        let use_parts = serde_json::to_string(&[MessagePart::ToolUse {
3336            id: "call_2529".to_string(),
3337            name: "memory_save".to_string(),
3338            input: serde_json::json!({"content": "save this"}),
3339        }])
3340        .unwrap();
3341        let orphan_assistant_id = sqlite
3342            .save_message_with_parts(
3343                cid,
3344                "assistant",
3345                "[tool_use: memory_save(call_2529)]",
3346                &use_parts,
3347            )
3348            .await
3349            .unwrap();
3350
3351        // Orphaned user[ToolResult]: content is ONLY a legacy tool tag + body.
3352        // Its tool_use_id ("call_2529") does not match any ToolUse in the preceding assistant
3353        // message in this position (will be made orphaned by inserting a plain assistant message
3354        // between them to break the pair).
3355        sqlite
3356            .save_message(cid, "assistant", "here is a plain reply")
3357            .await
3358            .unwrap();
3359
3360        let result_parts = serde_json::to_string(&[MessagePart::ToolResult {
3361            tool_use_id: "call_2529".to_string(),
3362            content: "saved".to_string(),
3363            is_error: false,
3364        }])
3365        .unwrap();
3366        let orphan_user_id = sqlite
3367            .save_message_with_parts(
3368                cid,
3369                "user",
3370                "[tool_result: call_2529]\nsaved",
3371                &result_parts,
3372            )
3373            .await
3374            .unwrap();
3375
3376        // Final plain message so history doesn't end on the orphan.
3377        sqlite.save_message(cid, "assistant", "done").await.unwrap();
3378
3379        let memory_arc = std::sync::Arc::new(memory);
3380        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
3381            memory_arc.clone(),
3382            cid,
3383            50,
3384            5,
3385            100,
3386        );
3387
3388        agent.load_history().await.unwrap();
3389
3390        // Verify that both orphaned messages now have `deleted_at IS NOT NULL` in SQLite.
3391        // COUNT(*) WHERE deleted_at IS NOT NULL returns 1 if soft-deleted, 0 otherwise.
3392        let assistant_deleted_count: Vec<i64> = zeph_db::query_scalar(
3393            "SELECT COUNT(*) FROM messages WHERE id = ? AND deleted_at IS NOT NULL",
3394        )
3395        .bind(orphan_assistant_id)
3396        .fetch_all(memory_arc.sqlite().pool())
3397        .await
3398        .unwrap();
3399
3400        let user_deleted_count: Vec<i64> = zeph_db::query_scalar(
3401            "SELECT COUNT(*) FROM messages WHERE id = ? AND deleted_at IS NOT NULL",
3402        )
3403        .bind(orphan_user_id)
3404        .fetch_all(memory_arc.sqlite().pool())
3405        .await
3406        .unwrap();
3407
3408        assert_eq!(
3409            assistant_deleted_count.first().copied().unwrap_or(0),
3410            1,
3411            "orphaned assistant[ToolUse] with legacy-only content must be soft-deleted (deleted_at IS NOT NULL)"
3412        );
3413        assert_eq!(
3414            user_deleted_count.first().copied().unwrap_or(0),
3415            1,
3416            "orphaned user[ToolResult] with legacy-only content must be soft-deleted (deleted_at IS NOT NULL)"
3417        );
3418    }
3419
3420    /// #2529 idempotency: after soft-delete on first `load_history`, a second call must not
3421    /// re-load the soft-deleted orphans. This ensures the WARN log does not repeat on the
3422    /// next session startup for the same orphaned messages.
3423    #[tokio::test]
3424    async fn issue_2529_soft_delete_is_idempotent_across_sessions() {
3425        use zeph_llm::provider::MessagePart;
3426
3427        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
3428        let cid = memory.sqlite().create_conversation().await.unwrap();
3429        let sqlite = memory.sqlite();
3430
3431        // Normal anchor message.
3432        sqlite
3433            .save_message(cid, "user", "do something")
3434            .await
3435            .unwrap();
3436
3437        // Orphaned assistant[ToolUse] with legacy-only content.
3438        let use_parts = serde_json::to_string(&[MessagePart::ToolUse {
3439            id: "call_idem".to_string(),
3440            name: "shell".to_string(),
3441            input: serde_json::json!({"command": "ls"}),
3442        }])
3443        .unwrap();
3444        sqlite
3445            .save_message_with_parts(cid, "assistant", "[tool_use: shell(call_idem)]", &use_parts)
3446            .await
3447            .unwrap();
3448
3449        // Break the pair: insert a plain assistant message so the ToolUse is mid-history orphan.
3450        sqlite
3451            .save_message(cid, "assistant", "continuing")
3452            .await
3453            .unwrap();
3454
3455        // Orphaned user[ToolResult] with legacy-only content.
3456        let result_parts = serde_json::to_string(&[MessagePart::ToolResult {
3457            tool_use_id: "call_idem".to_string(),
3458            content: "output".to_string(),
3459            is_error: false,
3460        }])
3461        .unwrap();
3462        sqlite
3463            .save_message_with_parts(
3464                cid,
3465                "user",
3466                "[tool_result: call_idem]\noutput",
3467                &result_parts,
3468            )
3469            .await
3470            .unwrap();
3471
3472        sqlite
3473            .save_message(cid, "assistant", "final")
3474            .await
3475            .unwrap();
3476
3477        let memory_arc = std::sync::Arc::new(memory);
3478
3479        // First session: load_history performs soft-delete of the orphaned pair.
3480        let mut agent1 = Agent::new(
3481            mock_provider(vec![]),
3482            MockChannel::new(vec![]),
3483            create_test_registry(),
3484            None,
3485            5,
3486            MockToolExecutor::no_tools(),
3487        )
3488        .with_memory(memory_arc.clone(), cid, 50, 5, 100);
3489        agent1.load_history().await.unwrap();
3490        let count_after_first = agent1.msg.messages.len();
3491
3492        // Second session: a fresh agent loading the same conversation must not see the
3493        // soft-deleted orphans — the WARN log must not repeat.
3494        let mut agent2 = Agent::new(
3495            mock_provider(vec![]),
3496            MockChannel::new(vec![]),
3497            create_test_registry(),
3498            None,
3499            5,
3500            MockToolExecutor::no_tools(),
3501        )
3502        .with_memory(memory_arc.clone(), cid, 50, 5, 100);
3503        agent2.load_history().await.unwrap();
3504        let count_after_second = agent2.msg.messages.len();
3505
3506        // Both sessions must load the same number of messages — soft-deleted orphans excluded.
3507        assert_eq!(
3508            count_after_first, count_after_second,
3509            "second load_history must load the same message count as the first (soft-deleted orphans excluded)"
3510        );
3511    }
3512
3513    /// Edge case for #2529: an orphaned assistant message whose content has BOTH meaningful text
3514    /// AND a `tool_use` tag must NOT be soft-deleted. The `ToolUse` parts are stripped but the
3515    /// message is kept because it has human-readable content.
3516    #[tokio::test]
3517    async fn issue_2529_message_with_text_and_tool_tag_is_kept_after_part_strip() {
3518        use zeph_llm::provider::MessagePart;
3519
3520        let provider = mock_provider(vec![]);
3521        let channel = MockChannel::new(vec![]);
3522        let registry = create_test_registry();
3523        let executor = MockToolExecutor::no_tools();
3524
3525        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
3526        let cid = memory.sqlite().create_conversation().await.unwrap();
3527        let sqlite = memory.sqlite();
3528
3529        // Normal first exchange.
3530        sqlite
3531            .save_message(cid, "user", "check the files")
3532            .await
3533            .unwrap();
3534
3535        // Assistant message: has BOTH meaningful text AND a ToolUse part.
3536        // Content contains real prose + legacy tag — has_meaningful_content must return true.
3537        let use_parts = serde_json::to_string(&[MessagePart::ToolUse {
3538            id: "call_mixed".to_string(),
3539            name: "shell".to_string(),
3540            input: serde_json::json!({"command": "ls"}),
3541        }])
3542        .unwrap();
3543        sqlite
3544            .save_message_with_parts(
3545                cid,
3546                "assistant",
3547                "Let me list the directory. [tool_use: shell(call_mixed)]",
3548                &use_parts,
3549            )
3550            .await
3551            .unwrap();
3552
3553        // No matching ToolResult follows — the ToolUse is orphaned.
3554        sqlite.save_message(cid, "user", "thanks").await.unwrap();
3555        sqlite
3556            .save_message(cid, "assistant", "you are welcome")
3557            .await
3558            .unwrap();
3559
3560        let memory_arc = std::sync::Arc::new(memory);
3561        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
3562            memory_arc.clone(),
3563            cid,
3564            50,
3565            5,
3566            100,
3567        );
3568
3569        let messages_before = agent.msg.messages.len();
3570        agent.load_history().await.unwrap();
3571
3572        // All 4 messages must be present — the mixed-content assistant message is KEPT.
3573        assert_eq!(
3574            agent.msg.messages.len(),
3575            messages_before + 4,
3576            "assistant message with text + tool tag must not be removed after ToolUse strip"
3577        );
3578
3579        // The mixed-content assistant message must have its ToolUse parts stripped.
3580        let mixed_msg = agent
3581            .msg
3582            .messages
3583            .iter()
3584            .find(|m| m.content.contains("Let me list the directory"))
3585            .expect("mixed-content assistant message must still be in history");
3586        assert!(
3587            !mixed_msg
3588                .parts
3589                .iter()
3590                .any(|p| matches!(p, MessagePart::ToolUse { .. })),
3591            "orphaned ToolUse parts must be stripped even when message has meaningful text"
3592        );
3593        assert_eq!(
3594            mixed_msg.content, "Let me list the directory. [tool_use: shell(call_mixed)]",
3595            "content field must be unchanged — only parts are stripped"
3596        );
3597    }
3598
3599    // ── [skipped]/[stopped] ToolResult embedding guard (#2620) ──────────────
3600
3601    #[tokio::test]
3602    async fn persist_message_skipped_tool_result_does_not_embed() {
3603        use zeph_llm::provider::MessagePart;
3604
3605        let provider = mock_provider(vec![]);
3606        let channel = MockChannel::new(vec![]);
3607        let registry = create_test_registry();
3608        let executor = MockToolExecutor::no_tools();
3609
3610        let (tx, rx) = tokio::sync::watch::channel(MetricsSnapshot::default());
3611        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
3612        let cid = memory.sqlite().create_conversation().await.unwrap();
3613
3614        let mut agent = Agent::new(provider, channel, registry, None, 5, executor)
3615            .with_metrics(tx)
3616            .with_memory(std::sync::Arc::new(memory), cid, 50, 5, 100);
3617        agent.memory_state.persistence.autosave_assistant = true;
3618        agent.memory_state.persistence.autosave_min_length = 0;
3619
3620        let parts = vec![MessagePart::ToolResult {
3621            tool_use_id: "tu1".into(),
3622            content: "[skipped] bash tool was blocked by utility gate".into(),
3623            is_error: false,
3624        }];
3625
3626        agent
3627            .persist_message(
3628                Role::User,
3629                "[skipped] bash tool was blocked by utility gate",
3630                &parts,
3631                false,
3632            )
3633            .await;
3634
3635        assert_eq!(
3636            rx.borrow().embeddings_generated,
3637            0,
3638            "[skipped] ToolResult must not be embedded into Qdrant"
3639        );
3640    }
3641
3642    #[tokio::test]
3643    async fn persist_message_stopped_tool_result_does_not_embed() {
3644        use zeph_llm::provider::MessagePart;
3645
3646        let provider = mock_provider(vec![]);
3647        let channel = MockChannel::new(vec![]);
3648        let registry = create_test_registry();
3649        let executor = MockToolExecutor::no_tools();
3650
3651        let (tx, rx) = tokio::sync::watch::channel(MetricsSnapshot::default());
3652        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
3653        let cid = memory.sqlite().create_conversation().await.unwrap();
3654
3655        let mut agent = Agent::new(provider, channel, registry, None, 5, executor)
3656            .with_metrics(tx)
3657            .with_memory(std::sync::Arc::new(memory), cid, 50, 5, 100);
3658        agent.memory_state.persistence.autosave_assistant = true;
3659        agent.memory_state.persistence.autosave_min_length = 0;
3660
3661        let parts = vec![MessagePart::ToolResult {
3662            tool_use_id: "tu2".into(),
3663            content: "[stopped] execution limit reached".into(),
3664            is_error: false,
3665        }];
3666
3667        agent
3668            .persist_message(
3669                Role::User,
3670                "[stopped] execution limit reached",
3671                &parts,
3672                false,
3673            )
3674            .await;
3675
3676        assert_eq!(
3677            rx.borrow().embeddings_generated,
3678            0,
3679            "[stopped] ToolResult must not be embedded into Qdrant"
3680        );
3681    }
3682
3683    #[tokio::test]
3684    async fn persist_message_normal_tool_result_is_saved_not_blocked_by_guard() {
3685        // Regression: a normal ToolResult (no [skipped]/[stopped] prefix) must not be
3686        // suppressed by the utility-gate guard and must reach the save path (SQLite).
3687        use zeph_llm::provider::MessagePart;
3688
3689        let provider = mock_provider(vec![]);
3690        let channel = MockChannel::new(vec![]);
3691        let registry = create_test_registry();
3692        let executor = MockToolExecutor::no_tools();
3693
3694        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
3695        let cid = memory.sqlite().create_conversation().await.unwrap();
3696        let memory_arc = std::sync::Arc::new(memory);
3697
3698        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
3699            memory_arc.clone(),
3700            cid,
3701            50,
3702            5,
3703            100,
3704        );
3705        agent.memory_state.persistence.autosave_assistant = true;
3706        agent.memory_state.persistence.autosave_min_length = 0;
3707
3708        let content = "total 42\ndrwxr-xr-x  5 user group";
3709        let parts = vec![MessagePart::ToolResult {
3710            tool_use_id: "tu3".into(),
3711            content: content.into(),
3712            is_error: false,
3713        }];
3714
3715        agent
3716            .persist_message(Role::User, content, &parts, false)
3717            .await;
3718
3719        // Must be saved to SQLite — confirms the guard did not block this path.
3720        let history = memory_arc.sqlite().load_history(cid, 50).await.unwrap();
3721        assert_eq!(
3722            history.len(),
3723            1,
3724            "normal ToolResult must be saved to SQLite"
3725        );
3726        assert_eq!(history[0].content, content);
3727    }
3728
3729    /// Verify that `enqueue_trajectory_extraction_task` uses a bounded tail slice instead of
3730    /// cloning the full message vec. We confirm the slice logic by checking that the
3731    /// `tail_start` calculation correctly bounds the window even with more messages than
3732    /// `max_messages`.
3733    #[test]
3734    fn trajectory_extraction_slice_bounds_messages() {
3735        // Replicate the slice logic from enqueue_trajectory_extraction_task.
3736        let max_messages: usize = 20;
3737        let total_messages = 100usize;
3738
3739        let tail_start = total_messages.saturating_sub(max_messages);
3740        let window = total_messages - tail_start;
3741
3742        assert_eq!(
3743            window, 20,
3744            "slice should contain exactly max_messages items"
3745        );
3746        assert_eq!(tail_start, 80, "slice should start at len - max_messages");
3747    }
3748
3749    #[test]
3750    fn trajectory_extraction_slice_handles_few_messages() {
3751        let max_messages: usize = 20;
3752        let total_messages = 5usize;
3753
3754        let tail_start = total_messages.saturating_sub(max_messages);
3755        let window = total_messages - tail_start;
3756
3757        assert_eq!(window, 5, "should return all messages when fewer than max");
3758        assert_eq!(tail_start, 0, "slice should start from the beginning");
3759    }
3760}