Skip to main content

zeph_core/agent/
persistence.rs

1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4use std::collections::HashSet;
5
6use crate::channel::Channel;
7use zeph_llm::provider::{LlmProvider as _, Message, MessagePart, Role};
8use zeph_memory::store::role_str;
9
10use super::Agent;
11
12/// Remove orphaned `ToolUse`/`ToolResult` messages from restored history.
13///
14/// Four failure modes are handled:
15/// 1. Trailing orphan: the last message is an assistant with `ToolUse` parts but no subsequent
16///    user message with `ToolResult` — caused by LIMIT boundary splits or interrupted sessions.
17/// 2. Leading orphan: the first message is a user with `ToolResult` parts but no preceding
18///    assistant message with `ToolUse` — caused by LIMIT boundary cuts.
19/// 3. Mid-history orphaned `ToolUse`: an assistant message with `ToolUse` parts is not followed
20///    by a user message with matching `ToolResult` parts. The `ToolUse` parts are stripped;
21///    if no content remains the message is removed.
22/// 4. Mid-history orphaned `ToolResult`: a user message has `ToolResult` parts whose
23///    `tool_use_id` is not present in the preceding assistant message. Those `ToolResult` parts
24///    are stripped; if no content remains the message is removed.
25///
26/// Boundary cases are resolved in a loop before the mid-history scan runs.
27fn sanitize_tool_pairs(messages: &mut Vec<Message>) -> (usize, Vec<i64>) {
28    let mut removed = 0;
29    let mut db_ids: Vec<i64> = Vec::new();
30
31    loop {
32        // Remove trailing orphaned tool_use (assistant message with ToolUse, no following tool_result).
33        if let Some(last) = messages.last()
34            && last.role == Role::Assistant
35            && last
36                .parts
37                .iter()
38                .any(|p| matches!(p, MessagePart::ToolUse { .. }))
39        {
40            let ids: Vec<String> = last
41                .parts
42                .iter()
43                .filter_map(|p| {
44                    if let MessagePart::ToolUse { id, .. } = p {
45                        Some(id.clone())
46                    } else {
47                        None
48                    }
49                })
50                .collect();
51            tracing::warn!(
52                tool_ids = ?ids,
53                "removing orphaned trailing tool_use message from restored history"
54            );
55            if let Some(db_id) = messages.last().and_then(|m| m.metadata.db_id) {
56                db_ids.push(db_id);
57            }
58            messages.pop();
59            removed += 1;
60            continue;
61        }
62
63        // Remove leading orphaned tool_result (user message with ToolResult, no preceding tool_use).
64        if let Some(first) = messages.first()
65            && first.role == Role::User
66            && first
67                .parts
68                .iter()
69                .any(|p| matches!(p, MessagePart::ToolResult { .. }))
70        {
71            let ids: Vec<String> = first
72                .parts
73                .iter()
74                .filter_map(|p| {
75                    if let MessagePart::ToolResult { tool_use_id, .. } = p {
76                        Some(tool_use_id.clone())
77                    } else {
78                        None
79                    }
80                })
81                .collect();
82            tracing::warn!(
83                tool_use_ids = ?ids,
84                "removing orphaned leading tool_result message from restored history"
85            );
86            if let Some(db_id) = messages.first().and_then(|m| m.metadata.db_id) {
87                db_ids.push(db_id);
88            }
89            messages.remove(0);
90            removed += 1;
91            continue;
92        }
93
94        break;
95    }
96
97    // Mid-history scan: strip ToolUse parts from any assistant message whose tool IDs are not
98    // matched by ToolResult parts in the immediately following user message.
99    let (mid_removed, mid_db_ids) = strip_mid_history_orphans(messages);
100    removed += mid_removed;
101    db_ids.extend(mid_db_ids);
102
103    (removed, db_ids)
104}
105
106/// Collect `tool_use` IDs from `msg` that have no matching `ToolResult` in `next_msg`.
107fn orphaned_tool_use_ids(msg: &Message, next_msg: Option<&Message>) -> HashSet<String> {
108    let matched: HashSet<String> = next_msg
109        .filter(|n| n.role == Role::User)
110        .map(|n| {
111            msg.parts
112                .iter()
113                .filter_map(|p| if let MessagePart::ToolUse { id, .. } = p { Some(id.clone()) } else { None })
114                .filter(|uid| n.parts.iter().any(|np| matches!(np, MessagePart::ToolResult { tool_use_id, .. } if tool_use_id == uid)))
115                .collect()
116        })
117        .unwrap_or_default();
118    msg.parts
119        .iter()
120        .filter_map(|p| {
121            if let MessagePart::ToolUse { id, .. } = p
122                && !matched.contains(id)
123            {
124                Some(id.clone())
125            } else {
126                None
127            }
128        })
129        .collect()
130}
131
132/// Collect `tool_result` IDs from `msg` that have no matching `ToolUse` in `prev_msg`.
133fn orphaned_tool_result_ids(msg: &Message, prev_msg: Option<&Message>) -> HashSet<String> {
134    let avail: HashSet<&str> = prev_msg
135        .filter(|p| p.role == Role::Assistant)
136        .map(|p| {
137            p.parts
138                .iter()
139                .filter_map(|part| {
140                    if let MessagePart::ToolUse { id, .. } = part {
141                        Some(id.as_str())
142                    } else {
143                        None
144                    }
145                })
146                .collect()
147        })
148        .unwrap_or_default();
149    msg.parts
150        .iter()
151        .filter_map(|p| {
152            if let MessagePart::ToolResult { tool_use_id, .. } = p
153                && !avail.contains(tool_use_id.as_str())
154            {
155                Some(tool_use_id.clone())
156            } else {
157                None
158            }
159        })
160        .collect()
161}
162
163/// Returns `true` if `content` contains human-readable text beyond legacy tool bracket markers.
164///
165/// Legacy markers produced by `Message::flatten_parts` are:
166/// - `[tool_use: name(id)]` — assistant `ToolUse`
167/// - `[tool_result: id]\nbody` — user `ToolResult` (tag + trailing body up to the next tag)
168/// - `[tool output: name] body` — `ToolOutput` (pruned or inline)
169/// - `[tool output: name]\n```\nbody\n``` ` — `ToolOutput` fenced block
170///
171/// A message whose content consists solely of such markers (and whitespace) has no
172/// user-visible text and is a candidate for soft-delete once its structured `parts` are gone.
173///
174/// Conservative rule: if a tag is malformed (no closing `]`), the content is treated as
175/// meaningful and the message is NOT deleted.
176///
177/// Note: `[image: mime, N bytes]` placeholders are intentionally treated as meaningful because
178/// they represent real media content and are not pure tool-execution artifacts.
179///
180/// Note: the Claude request-builder format `[tool_use: name] {json_input}` is used only for
181/// API payload construction and is never written to `SQLite` — it cannot appear in persisted
182/// message content, so no special handling is needed here.
183fn has_meaningful_content(content: &str) -> bool {
184    const PREFIXES: [&str; 3] = ["[tool_use: ", "[tool_result: ", "[tool output: "];
185
186    let mut remaining = content.trim();
187
188    loop {
189        // Find the earliest occurrence of any tool tag prefix.
190        let next = PREFIXES
191            .iter()
192            .filter_map(|prefix| remaining.find(prefix).map(|pos| (pos, *prefix)))
193            .min_by_key(|(pos, _)| *pos);
194
195        let Some((start, prefix)) = next else {
196            // No more tool tags — whatever remains decides the verdict.
197            break;
198        };
199
200        // Any non-whitespace text before this tag is meaningful.
201        if !remaining[..start].trim().is_empty() {
202            return true;
203        }
204
205        // Advance past the prefix to find the closing `]`.
206        let after_prefix = &remaining[start + prefix.len()..];
207        let Some(close) = after_prefix.find(']') else {
208            // Malformed tag (no closing bracket) — treat as meaningful, do not delete.
209            return true;
210        };
211
212        // Position after the `]`.
213        let tag_end = start + prefix.len() + close + 1;
214
215        if prefix == "[tool_result: " || prefix == "[tool output: " {
216            // Skip the body that immediately follows until the next tool tag prefix or end-of-string.
217            // The body is part of the tool artifact, not human-readable content.
218            let body = remaining[tag_end..].trim_start_matches('\n');
219            let next_tag = PREFIXES
220                .iter()
221                .filter_map(|p| body.find(p))
222                .min()
223                .unwrap_or(body.len());
224            remaining = &body[next_tag..];
225        } else {
226            remaining = &remaining[tag_end..];
227        }
228    }
229
230    !remaining.trim().is_empty()
231}
232
233/// Scan all messages and strip orphaned `ToolUse`/`ToolResult` parts from mid-history messages.
234///
235/// Two directions are checked:
236/// - Forward: assistant message has `ToolUse` parts not matched by `ToolResult` in the next user
237///   message — strip those `ToolUse` parts.
238/// - Reverse: user message has `ToolResult` parts whose `tool_use_id` is not present as a
239///   `ToolUse` in the preceding assistant message — strip those `ToolResult` parts.
240///
241/// Text parts are preserved; messages with no remaining content are removed entirely.
242///
243/// Returns `(count, db_ids)` where `count` is the number of messages removed entirely and
244/// `db_ids` contains the `metadata.db_id` values of those removed messages (for DB cleanup).
245fn strip_mid_history_orphans(messages: &mut Vec<Message>) -> (usize, Vec<i64>) {
246    let mut removed = 0;
247    let mut db_ids: Vec<i64> = Vec::new();
248    let mut i = 0;
249    while i < messages.len() {
250        // Forward pass: strip ToolUse parts from assistant messages that lack a matching
251        // ToolResult in the next user message. Only orphaned IDs are stripped — other ToolUse
252        // parts in the same message that DO have a matching ToolResult are preserved.
253        if messages[i].role == Role::Assistant
254            && messages[i]
255                .parts
256                .iter()
257                .any(|p| matches!(p, MessagePart::ToolUse { .. }))
258        {
259            let next_non_system = (i + 1..messages.len())
260                .find(|&j| messages[j].role != Role::System)
261                .and_then(|j| messages.get(j));
262            let orphaned_ids = orphaned_tool_use_ids(&messages[i], next_non_system);
263            if !orphaned_ids.is_empty() {
264                tracing::warn!(
265                    tool_ids = ?orphaned_ids,
266                    index = i,
267                    "stripping orphaned mid-history tool_use parts from assistant message"
268                );
269                messages[i].parts.retain(
270                    |p| !matches!(p, MessagePart::ToolUse { id, .. } if orphaned_ids.contains(id)),
271                );
272                let is_empty =
273                    !has_meaningful_content(&messages[i].content) && messages[i].parts.is_empty();
274                if is_empty {
275                    if let Some(db_id) = messages[i].metadata.db_id {
276                        db_ids.push(db_id);
277                    }
278                    messages.remove(i);
279                    removed += 1;
280                    continue; // Do not advance i — the next message is now at position i.
281                }
282            }
283        }
284
285        // Reverse pass: user ToolResult without matching ToolUse in the preceding assistant message.
286        if messages[i].role == Role::User
287            && messages[i]
288                .parts
289                .iter()
290                .any(|p| matches!(p, MessagePart::ToolResult { .. }))
291        {
292            let prev_non_system = (0..i)
293                .rev()
294                .find(|&j| messages[j].role != Role::System)
295                .and_then(|j| messages.get(j));
296            let orphaned_ids = orphaned_tool_result_ids(&messages[i], prev_non_system);
297            if !orphaned_ids.is_empty() {
298                tracing::warn!(
299                    tool_use_ids = ?orphaned_ids,
300                    index = i,
301                    "stripping orphaned mid-history tool_result parts from user message"
302                );
303                messages[i].parts.retain(|p| {
304                    !matches!(p, MessagePart::ToolResult { tool_use_id, .. } if orphaned_ids.contains(tool_use_id.as_str()))
305                });
306
307                let is_empty =
308                    !has_meaningful_content(&messages[i].content) && messages[i].parts.is_empty();
309                if is_empty {
310                    if let Some(db_id) = messages[i].metadata.db_id {
311                        db_ids.push(db_id);
312                    }
313                    messages.remove(i);
314                    removed += 1;
315                    // Do not advance i — the next message is now at position i.
316                    continue;
317                }
318            }
319        }
320
321        i += 1;
322    }
323    (removed, db_ids)
324}
325
326impl<C: Channel> Agent<C> {
327    /// Load conversation history from memory and inject into messages.
328    ///
329    /// # Errors
330    ///
331    /// Returns an error if loading history from `SQLite` fails.
332    pub async fn load_history(&mut self) -> Result<(), super::error::AgentError> {
333        let (Some(memory), Some(cid)) = (
334            &self.memory_state.persistence.memory,
335            self.memory_state.persistence.conversation_id,
336        ) else {
337            return Ok(());
338        };
339
340        let history = memory
341            .sqlite()
342            .load_history_filtered(
343                cid,
344                self.memory_state.persistence.history_limit,
345                Some(true),
346                None,
347            )
348            .await?;
349        if !history.is_empty() {
350            let mut loaded = 0;
351            let mut skipped = 0;
352
353            for msg in history {
354                // Only skip messages that have neither text content nor structured parts.
355                // Native tool calls produce user messages with empty `content` but non-empty
356                // `parts` (containing ToolResult). Skipping them here would orphan the
357                // preceding assistant ToolUse before sanitize_tool_pairs can clean it up.
358                if !has_meaningful_content(&msg.content) && msg.parts.is_empty() {
359                    tracing::warn!("skipping empty message from history (role: {:?})", msg.role);
360                    skipped += 1;
361                    continue;
362                }
363                self.msg.messages.push(msg);
364                loaded += 1;
365            }
366
367            // Determine the start index of just-loaded messages (system prompt is at index 0).
368            let history_start = self.msg.messages.len() - loaded;
369            let mut restored_slice = self.msg.messages.split_off(history_start);
370            let (orphans, orphan_db_ids) = sanitize_tool_pairs(&mut restored_slice);
371            skipped += orphans;
372            loaded = loaded.saturating_sub(orphans);
373            self.msg.messages.append(&mut restored_slice);
374
375            if !orphan_db_ids.is_empty() {
376                let ids: Vec<zeph_memory::types::MessageId> = orphan_db_ids
377                    .iter()
378                    .map(|&id| zeph_memory::types::MessageId(id))
379                    .collect();
380                if let Err(e) = memory.sqlite().soft_delete_messages(&ids).await {
381                    tracing::warn!(
382                        count = ids.len(),
383                        error = %e,
384                        "failed to soft-delete orphaned tool-pair messages from DB"
385                    );
386                } else {
387                    tracing::debug!(
388                        count = ids.len(),
389                        "soft-deleted orphaned tool-pair messages from DB"
390                    );
391                }
392            }
393
394            tracing::info!("restored {loaded} message(s) from conversation {cid}");
395            if skipped > 0 {
396                tracing::warn!("skipped {skipped} empty/orphaned message(s) from history");
397            }
398
399            if loaded > 0 {
400                // Increment session counts so tier promotion can track cross-session access.
401                // Errors are non-fatal — promotion will simply use stale counts.
402                let _ = memory
403                    .sqlite()
404                    .increment_session_counts_for_conversation(cid)
405                    .await
406                    .inspect_err(|e| {
407                        tracing::warn!(error = %e, "failed to increment tier session counts");
408                    });
409            }
410        }
411
412        if let Ok(count) = memory.message_count(cid).await {
413            let count_u64 = u64::try_from(count).unwrap_or(0);
414            self.update_metrics(|m| {
415                m.sqlite_message_count = count_u64;
416            });
417        }
418
419        if let Ok(count) = memory.sqlite().count_semantic_facts().await {
420            let count_u64 = u64::try_from(count).unwrap_or(0);
421            self.update_metrics(|m| {
422                m.semantic_fact_count = count_u64;
423            });
424        }
425
426        if let Ok(count) = memory.unsummarized_message_count(cid).await {
427            self.memory_state.persistence.unsummarized_count = usize::try_from(count).unwrap_or(0);
428        }
429
430        self.recompute_prompt_tokens();
431        Ok(())
432    }
433
434    /// Persist a message to memory.
435    ///
436    /// `has_injection_flags` controls whether Qdrant embedding is skipped for this message.
437    /// When `true` and `guard_memory_writes` is enabled, only `SQLite` is written — the message
438    /// is saved for conversation continuity but will not pollute semantic search (M2, D2).
439    #[allow(clippy::too_many_lines)]
440    #[cfg_attr(
441        feature = "profiling",
442        tracing::instrument(name = "agent.persist_message", skip_all)
443    )]
444    pub(crate) async fn persist_message(
445        &mut self,
446        role: Role,
447        content: &str,
448        parts: &[MessagePart],
449        has_injection_flags: bool,
450    ) {
451        let (Some(memory), Some(cid)) = (
452            &self.memory_state.persistence.memory,
453            self.memory_state.persistence.conversation_id,
454        ) else {
455            return;
456        };
457
458        let parts_json = if parts.is_empty() {
459            "[]".to_string()
460        } else {
461            match serde_json::to_string(parts) {
462                Ok(json) => json,
463                Err(e) => {
464                    tracing::error!(
465                        role = ?role,
466                        parts_count = parts.len(),
467                        error = %e,
468                        "failed to serialize message parts — skipping persist to avoid orphaned tool pair"
469                    );
470                    return;
471                }
472            }
473        };
474
475        // M2: injection flag is passed explicitly to avoid stale mutable-bool state on Agent.
476        // When has_injection_flags=true, skip embedding to prevent poisoned content from
477        // polluting Qdrant semantic search results.
478        let guard_event = self
479            .security
480            .exfiltration_guard
481            .should_guard_memory_write(has_injection_flags);
482        if let Some(ref event) = guard_event {
483            tracing::warn!(
484                ?event,
485                "exfiltration guard: skipping Qdrant embedding for flagged content"
486            );
487            self.update_metrics(|m| m.exfiltration_memory_guards += 1);
488            self.push_security_event(
489                crate::metrics::SecurityEventCategory::ExfiltrationBlock,
490                "memory_write",
491                "Qdrant embedding skipped: flagged content",
492            );
493        }
494
495        let skip_embedding = guard_event.is_some();
496
497        // Do not embed [skipped] or [stopped] ToolResult content into Qdrant — these are
498        // internal policy markers that carry no useful semantic information and would
499        // contaminate memory_search results, causing the utility-gate Retrieve loop (#2620).
500        let has_skipped_tool_result = parts.iter().any(|p| {
501            if let MessagePart::ToolResult { content, .. } = p {
502                content.starts_with("[skipped]") || content.starts_with("[stopped]")
503            } else {
504                false
505            }
506        });
507
508        let should_embed = if skip_embedding || has_skipped_tool_result {
509            false
510        } else {
511            match role {
512                Role::Assistant => {
513                    self.memory_state.persistence.autosave_assistant
514                        && content.len() >= self.memory_state.persistence.autosave_min_length
515                }
516                _ => true,
517            }
518        };
519
520        let goal_text = self.memory_state.extraction.goal_text.clone();
521
522        tracing::debug!(
523            "persist_message: calling remember_with_parts, embed dispatched to background"
524        );
525        let (embedding_stored, was_persisted) = if should_embed {
526            match memory
527                .remember_with_parts(
528                    cid,
529                    role_str(role),
530                    content,
531                    &parts_json,
532                    goal_text.as_deref(),
533                )
534                .await
535            {
536                Ok((Some(message_id), stored)) => {
537                    self.msg.last_persisted_message_id = Some(message_id.0);
538                    (stored, true)
539                }
540                Ok((None, _)) => {
541                    // A-MAC admission rejected — skip increment and further processing.
542                    return;
543                }
544                Err(e) => {
545                    tracing::error!("failed to persist message: {e:#}");
546                    return;
547                }
548            }
549        } else {
550            match memory
551                .save_only(cid, role_str(role), content, &parts_json)
552                .await
553            {
554                Ok(message_id) => {
555                    self.msg.last_persisted_message_id = Some(message_id.0);
556                    (false, true)
557                }
558                Err(e) => {
559                    tracing::error!("failed to persist message: {e:#}");
560                    return;
561                }
562            }
563        };
564
565        if !was_persisted {
566            return;
567        }
568
569        self.memory_state.persistence.unsummarized_count += 1;
570
571        self.update_metrics(|m| {
572            m.sqlite_message_count += 1;
573            if embedding_stored {
574                m.embeddings_generated += 1;
575            }
576        });
577
578        tracing::debug!("persist_message: db insert complete, embedding running in background");
579        memory.reap_embed_tasks();
580
581        // Phase 2: enqueue enrichment tasks via supervisor (non-blocking).
582        // check_summarization signals completion via SummarizationSignal, consumed in reap()
583        // between turns — no shared mutable state across tasks (S1 fix).
584        self.enqueue_summarization_task();
585
586        // FIX-1: skip graph extraction for tool result messages — they contain raw structured
587        // output (TOML, JSON, code) that pollutes the entity graph with noise.
588        let has_tool_result_parts = parts
589            .iter()
590            .any(|p| matches!(p, MessagePart::ToolResult { .. }));
591
592        self.enqueue_graph_extraction_task(content, has_injection_flags, has_tool_result_parts)
593            .await;
594
595        // Persona extraction: run only for user messages that are not tool results and not injected.
596        if role == Role::User && !has_tool_result_parts && !has_injection_flags {
597            self.enqueue_persona_extraction_task();
598        }
599
600        // Trajectory extraction: run after turns that contained tool results.
601        if has_tool_result_parts {
602            self.enqueue_trajectory_extraction_task();
603        }
604    }
605
606    /// Enqueue background summarization via the supervisor (S1 fix: no shared `AtomicUsize`).
607    fn enqueue_summarization_task(&mut self) {
608        let (Some(memory), Some(cid)) = (
609            self.memory_state.persistence.memory.clone(),
610            self.memory_state.persistence.conversation_id,
611        ) else {
612            return;
613        };
614
615        if self.memory_state.persistence.unsummarized_count
616            <= self.memory_state.compaction.summarization_threshold
617        {
618            return;
619        }
620
621        let batch_size = self.memory_state.compaction.summarization_threshold / 2;
622
623        self.lifecycle.supervisor.spawn_summarization("summarization", async move {
624            match tokio::time::timeout(
625                std::time::Duration::from_secs(30),
626                memory.summarize(cid, batch_size),
627            )
628            .await
629            {
630                Ok(Ok(Some(summary_id))) => {
631                    tracing::info!(
632                        "background summarization: created summary {summary_id} for conversation {cid}"
633                    );
634                    true
635                }
636                Ok(Ok(None)) => {
637                    tracing::debug!("background summarization: no summarization needed");
638                    false
639                }
640                Ok(Err(e)) => {
641                    tracing::error!("background summarization failed: {e:#}");
642                    false
643                }
644                Err(_) => {
645                    tracing::warn!("background summarization timed out after 30s");
646                    false
647                }
648            }
649        });
650    }
651
652    /// Prepare graph extraction guards in foreground, then enqueue heavy work via supervisor.
653    ///
654    /// Guards (enabled check, injection/tool-result skip) stay on the foreground path.
655    /// The RPE check and actual extraction run in background (S2: no `send_status`).
656    #[allow(clippy::too_many_lines)]
657    async fn enqueue_graph_extraction_task(
658        &mut self,
659        content: &str,
660        has_injection_flags: bool,
661        has_tool_result_parts: bool,
662    ) {
663        use zeph_memory::semantic::GraphExtractionConfig;
664
665        if self.memory_state.persistence.memory.is_none()
666            || self.memory_state.persistence.conversation_id.is_none()
667        {
668            return;
669        }
670        if has_tool_result_parts {
671            tracing::debug!("graph extraction skipped: message contains ToolResult parts");
672            return;
673        }
674        if has_injection_flags {
675            tracing::warn!("graph extraction skipped: injection patterns detected in content");
676            return;
677        }
678
679        let extraction_cfg = {
680            let cfg = &self.memory_state.extraction.graph_config;
681            if !cfg.enabled {
682                return;
683            }
684            GraphExtractionConfig {
685                max_entities: cfg.max_entities_per_message,
686                max_edges: cfg.max_edges_per_message,
687                extraction_timeout_secs: cfg.extraction_timeout_secs,
688                community_refresh_interval: cfg.community_refresh_interval,
689                expired_edge_retention_days: cfg.expired_edge_retention_days,
690                max_entities_cap: cfg.max_entities,
691                community_summary_max_prompt_bytes: cfg.community_summary_max_prompt_bytes,
692                community_summary_concurrency: cfg.community_summary_concurrency,
693                lpa_edge_chunk_size: cfg.lpa_edge_chunk_size,
694                note_linking: zeph_memory::NoteLinkingConfig {
695                    enabled: cfg.note_linking.enabled,
696                    similarity_threshold: cfg.note_linking.similarity_threshold,
697                    top_k: cfg.note_linking.top_k,
698                    timeout_secs: cfg.note_linking.timeout_secs,
699                },
700                link_weight_decay_lambda: cfg.link_weight_decay_lambda,
701                link_weight_decay_interval_secs: cfg.link_weight_decay_interval_secs,
702                belief_revision_enabled: cfg.belief_revision.enabled,
703                belief_revision_similarity_threshold: cfg.belief_revision.similarity_threshold,
704                conversation_id: self.memory_state.persistence.conversation_id.map(|c| c.0),
705            }
706        };
707
708        // RPE check: embed + compute surprise score. Stays on foreground to avoid
709        // capturing the rpe_router mutex in a background task.
710        if self.rpe_should_skip(content).await {
711            tracing::debug!("D-MEM RPE: low-surprise turn, skipping graph extraction");
712            return;
713        }
714
715        let context_messages: Vec<String> = self
716            .msg
717            .messages
718            .iter()
719            .rev()
720            .filter(|m| {
721                m.role == Role::User
722                    && !m
723                        .parts
724                        .iter()
725                        .any(|p| matches!(p, MessagePart::ToolResult { .. }))
726            })
727            .take(4)
728            .map(|m| {
729                if m.content.len() > 2048 {
730                    m.content[..m.content.floor_char_boundary(2048)].to_owned()
731                } else {
732                    m.content.clone()
733                }
734            })
735            .collect();
736
737        let Some(memory) = self.memory_state.persistence.memory.clone() else {
738            return;
739        };
740
741        let validator: zeph_memory::semantic::PostExtractValidator =
742            if self.security.memory_validator.is_enabled() {
743                let v = self.security.memory_validator.clone();
744                Some(Box::new(move |result| {
745                    v.validate_graph_extraction(result)
746                        .map_err(|e| e.to_string())
747                }))
748            } else {
749                None
750            };
751
752        let content_owned = content.to_owned();
753        let graph_store = memory.graph_store.clone();
754        let metrics_tx = self.metrics.metrics_tx.clone();
755        let start_time = self.lifecycle.start_time;
756
757        self.lifecycle.supervisor.spawn(
758            super::agent_supervisor::TaskClass::Enrichment,
759            "graph_extraction",
760            async move {
761                let extraction_handle = memory.spawn_graph_extraction(
762                    content_owned,
763                    context_messages,
764                    extraction_cfg,
765                    validator,
766                );
767
768                // After extraction completes, refresh graph count metrics (was Telemetry spawn).
769                if let (Some(store), Some(tx)) = (graph_store, metrics_tx) {
770                    let _ = extraction_handle.await;
771                    let (entities, edges, communities) = tokio::join!(
772                        store.entity_count(),
773                        store.active_edge_count(),
774                        store.community_count()
775                    );
776                    let elapsed = start_time.elapsed().as_secs();
777                    tx.send_modify(|m| {
778                        m.uptime_seconds = elapsed;
779                        m.graph_entities_total = entities.unwrap_or(0).cast_unsigned();
780                        m.graph_edges_total = edges.unwrap_or(0).cast_unsigned();
781                        m.graph_communities_total = communities.unwrap_or(0).cast_unsigned();
782                    });
783                } else {
784                    let _ = extraction_handle.await;
785                }
786
787                tracing::debug!("background graph extraction complete");
788            },
789        );
790
791        // Sync community failures and extraction metrics (cheap, foreground-safe).
792        self.sync_community_detection_failures();
793        self.sync_graph_extraction_metrics();
794        // sync_graph_counts and sync_guidelines_status are DB reads; move to Telemetry background.
795        let memory_for_sync = self.memory_state.persistence.memory.clone();
796        let metrics_tx_sync = self.metrics.metrics_tx.clone();
797        let start_time_sync = self.lifecycle.start_time;
798        let cid_sync = self.memory_state.persistence.conversation_id;
799        let graph_store_sync = memory_for_sync.as_ref().and_then(|m| m.graph_store.clone());
800        let sqlite_sync = memory_for_sync.as_ref().map(|m| m.sqlite().clone());
801        let guidelines_enabled = self.memory_state.extraction.graph_config.enabled;
802
803        self.lifecycle.supervisor.spawn(
804            super::agent_supervisor::TaskClass::Telemetry,
805            "graph_count_sync",
806            async move {
807                let Some(store) = graph_store_sync else {
808                    return;
809                };
810                let Some(tx) = metrics_tx_sync else { return };
811
812                let (entities, edges, communities) = tokio::join!(
813                    store.entity_count(),
814                    store.active_edge_count(),
815                    store.community_count()
816                );
817                let elapsed = start_time_sync.elapsed().as_secs();
818                tx.send_modify(|m| {
819                    m.uptime_seconds = elapsed;
820                    m.graph_entities_total = entities.unwrap_or(0).cast_unsigned();
821                    m.graph_edges_total = edges.unwrap_or(0).cast_unsigned();
822                    m.graph_communities_total = communities.unwrap_or(0).cast_unsigned();
823                });
824
825                // Sync guidelines status.
826                if guidelines_enabled && let Some(sqlite) = sqlite_sync {
827                    match tokio::time::timeout(
828                        std::time::Duration::from_secs(10),
829                        sqlite.load_compression_guidelines_meta(cid_sync),
830                    )
831                    .await
832                    {
833                        Ok(Ok((version, created_at))) => {
834                            #[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
835                            let version_u32 = u32::try_from(version).unwrap_or(0);
836                            tx.send_modify(|m| {
837                                m.guidelines_version = version_u32;
838                                m.guidelines_updated_at = created_at;
839                            });
840                        }
841                        Ok(Err(e)) => {
842                            tracing::debug!("guidelines status sync failed: {e:#}");
843                        }
844                        Err(_) => {
845                            tracing::debug!("guidelines status sync timed out");
846                        }
847                    }
848                }
849            },
850        );
851    }
852
853    /// Enqueue persona extraction via supervisor (background, no `send_status`).
854    fn enqueue_persona_extraction_task(&mut self) {
855        use zeph_memory::semantic::{PersonaExtractionConfig, extract_persona_facts};
856
857        let cfg = &self.memory_state.extraction.persona_config;
858        if !cfg.enabled {
859            return;
860        }
861
862        let Some(memory) = &self.memory_state.persistence.memory else {
863            return;
864        };
865
866        let user_messages: Vec<String> = self
867            .msg
868            .messages
869            .iter()
870            .filter(|m| {
871                m.role == Role::User
872                    && !m
873                        .parts
874                        .iter()
875                        .any(|p| matches!(p, MessagePart::ToolResult { .. }))
876            })
877            .take(8)
878            .map(|m| {
879                if m.content.len() > 2048 {
880                    m.content[..m.content.floor_char_boundary(2048)].to_owned()
881                } else {
882                    m.content.clone()
883                }
884            })
885            .collect();
886
887        if user_messages.len() < cfg.min_messages {
888            return;
889        }
890
891        let timeout_secs = cfg.extraction_timeout_secs;
892        let extraction_cfg = PersonaExtractionConfig {
893            enabled: cfg.enabled,
894            min_messages: cfg.min_messages,
895            max_messages: cfg.max_messages,
896            extraction_timeout_secs: timeout_secs,
897        };
898
899        let provider = self.resolve_background_provider(cfg.persona_provider.as_str());
900        let store = memory.sqlite().clone();
901        let conversation_id = self.memory_state.persistence.conversation_id.map(|c| c.0);
902
903        self.lifecycle.supervisor.spawn(
904            super::agent_supervisor::TaskClass::Enrichment,
905            "persona_extraction",
906            async move {
907                let user_message_refs: Vec<&str> =
908                    user_messages.iter().map(String::as_str).collect();
909                let fut = extract_persona_facts(
910                    &store,
911                    &provider,
912                    &user_message_refs,
913                    &extraction_cfg,
914                    conversation_id,
915                );
916                match tokio::time::timeout(std::time::Duration::from_secs(timeout_secs), fut).await
917                {
918                    Ok(Ok(n)) => tracing::debug!(upserted = n, "persona extraction complete"),
919                    Ok(Err(e)) => tracing::warn!(error = %e, "persona extraction failed"),
920                    Err(_) => tracing::warn!(
921                        timeout_secs,
922                        "persona extraction timed out — no facts written this turn"
923                    ),
924                }
925            },
926        );
927    }
928
929    /// Enqueue trajectory extraction via supervisor (background).
930    fn enqueue_trajectory_extraction_task(&mut self) {
931        use zeph_memory::semantic::{TrajectoryExtractionConfig, extract_trajectory_entries};
932
933        let cfg = self.memory_state.extraction.trajectory_config.clone();
934        if !cfg.enabled {
935            return;
936        }
937
938        let Some(memory) = &self.memory_state.persistence.memory else {
939            return;
940        };
941
942        let conversation_id = match self.memory_state.persistence.conversation_id {
943            Some(cid) => cid.0,
944            None => return,
945        };
946
947        let tail_start = self.msg.messages.len().saturating_sub(cfg.max_messages);
948        let turn_messages: Vec<zeph_llm::provider::Message> =
949            self.msg.messages[tail_start..].to_vec();
950
951        if turn_messages.is_empty() {
952            return;
953        }
954
955        let extraction_cfg = TrajectoryExtractionConfig {
956            enabled: cfg.enabled,
957            max_messages: cfg.max_messages,
958            extraction_timeout_secs: cfg.extraction_timeout_secs,
959        };
960
961        let provider = self.resolve_background_provider(cfg.trajectory_provider.as_str());
962        let store = memory.sqlite().clone();
963        let min_confidence = cfg.min_confidence;
964
965        self.lifecycle.supervisor.spawn(
966            super::agent_supervisor::TaskClass::Enrichment,
967            "trajectory_extraction",
968            async move {
969                let entries =
970                    match extract_trajectory_entries(&provider, &turn_messages, &extraction_cfg)
971                        .await
972                    {
973                        Ok(e) => e,
974                        Err(e) => {
975                            tracing::warn!(error = %e, "trajectory extraction failed");
976                            return;
977                        }
978                    };
979
980                let last_id = store
981                    .trajectory_last_extracted_message_id(conversation_id)
982                    .await
983                    .unwrap_or(0);
984
985                let mut max_id = last_id;
986                for entry in &entries {
987                    if entry.confidence < min_confidence {
988                        continue;
989                    }
990                    let tools_json = serde_json::to_string(&entry.tools_used)
991                        .unwrap_or_else(|_| "[]".to_string());
992                    match store
993                        .insert_trajectory_entry(zeph_memory::NewTrajectoryEntry {
994                            conversation_id: Some(conversation_id),
995                            turn_index: 0,
996                            kind: &entry.kind,
997                            intent: &entry.intent,
998                            outcome: &entry.outcome,
999                            tools_used: &tools_json,
1000                            confidence: entry.confidence,
1001                        })
1002                        .await
1003                    {
1004                        Ok(id) => {
1005                            if id > max_id {
1006                                max_id = id;
1007                            }
1008                        }
1009                        Err(e) => tracing::warn!(error = %e, "failed to insert trajectory entry"),
1010                    }
1011                }
1012
1013                if max_id > last_id {
1014                    let _ = store
1015                        .set_trajectory_last_extracted_message_id(conversation_id, max_id)
1016                        .await;
1017                }
1018
1019                tracing::debug!(
1020                    count = entries.len(),
1021                    conversation_id,
1022                    "trajectory extraction complete"
1023                );
1024            },
1025        );
1026    }
1027
1028    /// D-MEM RPE check: returns `true` when the current turn should skip graph extraction.
1029    ///
1030    /// Embeds `content`, computes RPE via the router, and updates the router state.
1031    /// Returns `false` (do not skip) on any error — conservative fallback.
1032    async fn rpe_should_skip(&mut self, content: &str) -> bool {
1033        let Some(ref rpe_mutex) = self.memory_state.extraction.rpe_router else {
1034            return false;
1035        };
1036        let Some(memory) = &self.memory_state.persistence.memory else {
1037            return false;
1038        };
1039        let candidates = zeph_memory::extract_candidate_entities(content);
1040        let provider = memory.provider();
1041        let Ok(Ok(emb_vec)) =
1042            tokio::time::timeout(std::time::Duration::from_secs(5), provider.embed(content)).await
1043        else {
1044            return false; // embed failed/timed out → extract
1045        };
1046        if let Ok(mut router) = rpe_mutex.lock() {
1047            let signal = router.compute(&emb_vec, &candidates);
1048            router.push_embedding(emb_vec);
1049            router.push_entities(&candidates);
1050            !signal.should_extract
1051        } else {
1052            tracing::warn!("rpe_router mutex poisoned; falling through to extract");
1053            false
1054        }
1055    }
1056}
1057
1058#[cfg(test)]
1059mod tests {
1060    use super::super::agent_tests::{
1061        MetricsSnapshot, MockChannel, MockToolExecutor, create_test_registry, mock_provider,
1062    };
1063    use super::*;
1064    use zeph_llm::any::AnyProvider;
1065    use zeph_memory::semantic::SemanticMemory;
1066
1067    async fn test_memory(provider: &AnyProvider) -> SemanticMemory {
1068        SemanticMemory::new(
1069            ":memory:",
1070            "http://127.0.0.1:1",
1071            provider.clone(),
1072            "test-model",
1073        )
1074        .await
1075        .unwrap()
1076    }
1077
1078    #[tokio::test]
1079    async fn load_history_without_memory_returns_ok() {
1080        let provider = mock_provider(vec![]);
1081        let channel = MockChannel::new(vec![]);
1082        let registry = create_test_registry();
1083        let executor = MockToolExecutor::no_tools();
1084        let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
1085
1086        let result = agent.load_history().await;
1087        assert!(result.is_ok());
1088        // No messages added when no memory is configured
1089        assert_eq!(agent.msg.messages.len(), 1); // system prompt only
1090    }
1091
1092    #[tokio::test]
1093    async fn load_history_with_messages_injects_into_agent() {
1094        let provider = mock_provider(vec![]);
1095        let channel = MockChannel::new(vec![]);
1096        let registry = create_test_registry();
1097        let executor = MockToolExecutor::no_tools();
1098
1099        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1100        let cid = memory.sqlite().create_conversation().await.unwrap();
1101
1102        memory
1103            .sqlite()
1104            .save_message(cid, "user", "hello from history")
1105            .await
1106            .unwrap();
1107        memory
1108            .sqlite()
1109            .save_message(cid, "assistant", "hi back")
1110            .await
1111            .unwrap();
1112
1113        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
1114            std::sync::Arc::new(memory),
1115            cid,
1116            50,
1117            5,
1118            100,
1119        );
1120
1121        let messages_before = agent.msg.messages.len();
1122        agent.load_history().await.unwrap();
1123        // Two messages were added from history
1124        assert_eq!(agent.msg.messages.len(), messages_before + 2);
1125    }
1126
1127    #[tokio::test]
1128    async fn load_history_skips_empty_messages() {
1129        let provider = mock_provider(vec![]);
1130        let channel = MockChannel::new(vec![]);
1131        let registry = create_test_registry();
1132        let executor = MockToolExecutor::no_tools();
1133
1134        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1135        let cid = memory.sqlite().create_conversation().await.unwrap();
1136
1137        // Save an empty message (should be skipped) and a valid one
1138        memory
1139            .sqlite()
1140            .save_message(cid, "user", "   ")
1141            .await
1142            .unwrap();
1143        memory
1144            .sqlite()
1145            .save_message(cid, "user", "real message")
1146            .await
1147            .unwrap();
1148
1149        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
1150            std::sync::Arc::new(memory),
1151            cid,
1152            50,
1153            5,
1154            100,
1155        );
1156
1157        let messages_before = agent.msg.messages.len();
1158        agent.load_history().await.unwrap();
1159        // Only the non-empty message is loaded
1160        assert_eq!(agent.msg.messages.len(), messages_before + 1);
1161    }
1162
1163    #[tokio::test]
1164    async fn load_history_with_empty_store_returns_ok() {
1165        let provider = mock_provider(vec![]);
1166        let channel = MockChannel::new(vec![]);
1167        let registry = create_test_registry();
1168        let executor = MockToolExecutor::no_tools();
1169
1170        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1171        let cid = memory.sqlite().create_conversation().await.unwrap();
1172
1173        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
1174            std::sync::Arc::new(memory),
1175            cid,
1176            50,
1177            5,
1178            100,
1179        );
1180
1181        let messages_before = agent.msg.messages.len();
1182        agent.load_history().await.unwrap();
1183        // No messages added — empty history
1184        assert_eq!(agent.msg.messages.len(), messages_before);
1185    }
1186
1187    #[tokio::test]
1188    async fn load_history_increments_session_count_for_existing_messages() {
1189        let provider = mock_provider(vec![]);
1190        let channel = MockChannel::new(vec![]);
1191        let registry = create_test_registry();
1192        let executor = MockToolExecutor::no_tools();
1193
1194        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1195        let cid = memory.sqlite().create_conversation().await.unwrap();
1196
1197        // Save two messages — they start with session_count = 0.
1198        let id1 = memory
1199            .sqlite()
1200            .save_message(cid, "user", "hello")
1201            .await
1202            .unwrap();
1203        let id2 = memory
1204            .sqlite()
1205            .save_message(cid, "assistant", "hi")
1206            .await
1207            .unwrap();
1208
1209        let memory_arc = std::sync::Arc::new(memory);
1210        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
1211            memory_arc.clone(),
1212            cid,
1213            50,
1214            5,
1215            100,
1216        );
1217
1218        agent.load_history().await.unwrap();
1219
1220        // Both episodic messages must have session_count = 1 after restore.
1221        let counts: Vec<i64> = zeph_db::query_scalar(
1222            "SELECT session_count FROM messages WHERE id IN (?, ?) ORDER BY id",
1223        )
1224        .bind(id1)
1225        .bind(id2)
1226        .fetch_all(memory_arc.sqlite().pool())
1227        .await
1228        .unwrap();
1229        assert_eq!(
1230            counts,
1231            vec![1, 1],
1232            "session_count must be 1 after first restore"
1233        );
1234    }
1235
1236    #[tokio::test]
1237    async fn load_history_does_not_increment_session_count_for_new_conversation() {
1238        let provider = mock_provider(vec![]);
1239        let channel = MockChannel::new(vec![]);
1240        let registry = create_test_registry();
1241        let executor = MockToolExecutor::no_tools();
1242
1243        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1244        let cid = memory.sqlite().create_conversation().await.unwrap();
1245
1246        // No messages saved — empty conversation.
1247        let memory_arc = std::sync::Arc::new(memory);
1248        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
1249            memory_arc.clone(),
1250            cid,
1251            50,
1252            5,
1253            100,
1254        );
1255
1256        agent.load_history().await.unwrap();
1257
1258        // No rows → no session_count increments → query returns empty.
1259        let counts: Vec<i64> =
1260            zeph_db::query_scalar("SELECT session_count FROM messages WHERE conversation_id = ?")
1261                .bind(cid)
1262                .fetch_all(memory_arc.sqlite().pool())
1263                .await
1264                .unwrap();
1265        assert!(counts.is_empty(), "new conversation must have no messages");
1266    }
1267
1268    #[tokio::test]
1269    async fn persist_message_without_memory_silently_returns() {
1270        // No memory configured — persist_message must not panic
1271        let provider = mock_provider(vec![]);
1272        let channel = MockChannel::new(vec![]);
1273        let registry = create_test_registry();
1274        let executor = MockToolExecutor::no_tools();
1275        let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
1276
1277        // Must not panic and must complete
1278        agent.persist_message(Role::User, "hello", &[], false).await;
1279    }
1280
1281    #[tokio::test]
1282    async fn persist_message_assistant_autosave_false_uses_save_only() {
1283        let provider = mock_provider(vec![]);
1284        let channel = MockChannel::new(vec![]);
1285        let registry = create_test_registry();
1286        let executor = MockToolExecutor::no_tools();
1287
1288        let (tx, rx) = tokio::sync::watch::channel(MetricsSnapshot::default());
1289        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1290        let cid = memory.sqlite().create_conversation().await.unwrap();
1291
1292        let mut agent = Agent::new(provider, channel, registry, None, 5, executor)
1293            .with_metrics(tx)
1294            .with_memory(std::sync::Arc::new(memory), cid, 50, 5, 100);
1295        agent.memory_state.persistence.autosave_assistant = false;
1296        agent.memory_state.persistence.autosave_min_length = 20;
1297
1298        agent
1299            .persist_message(Role::Assistant, "short assistant reply", &[], false)
1300            .await;
1301
1302        let history = agent
1303            .memory_state
1304            .persistence
1305            .memory
1306            .as_ref()
1307            .unwrap()
1308            .sqlite()
1309            .load_history(cid, 50)
1310            .await
1311            .unwrap();
1312        assert_eq!(history.len(), 1, "message must be saved");
1313        assert_eq!(history[0].content, "short assistant reply");
1314        // embeddings_generated must remain 0 — save_only path does not embed
1315        assert_eq!(rx.borrow().embeddings_generated, 0);
1316    }
1317
1318    #[tokio::test]
1319    async fn persist_message_assistant_below_min_length_uses_save_only() {
1320        let provider = mock_provider(vec![]);
1321        let channel = MockChannel::new(vec![]);
1322        let registry = create_test_registry();
1323        let executor = MockToolExecutor::no_tools();
1324
1325        let (tx, rx) = tokio::sync::watch::channel(MetricsSnapshot::default());
1326        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1327        let cid = memory.sqlite().create_conversation().await.unwrap();
1328
1329        // autosave_assistant=true but min_length=1000 — short content falls back to save_only
1330        let mut agent = Agent::new(provider, channel, registry, None, 5, executor)
1331            .with_metrics(tx)
1332            .with_memory(std::sync::Arc::new(memory), cid, 50, 5, 100);
1333        agent.memory_state.persistence.autosave_assistant = true;
1334        agent.memory_state.persistence.autosave_min_length = 1000;
1335
1336        agent
1337            .persist_message(Role::Assistant, "too short", &[], false)
1338            .await;
1339
1340        let history = agent
1341            .memory_state
1342            .persistence
1343            .memory
1344            .as_ref()
1345            .unwrap()
1346            .sqlite()
1347            .load_history(cid, 50)
1348            .await
1349            .unwrap();
1350        assert_eq!(history.len(), 1, "message must be saved");
1351        assert_eq!(history[0].content, "too short");
1352        assert_eq!(rx.borrow().embeddings_generated, 0);
1353    }
1354
1355    #[tokio::test]
1356    async fn persist_message_assistant_at_min_length_boundary_uses_embed() {
1357        // content.len() == autosave_min_length → should_embed = true (>= boundary).
1358        let provider = mock_provider(vec![]);
1359        let channel = MockChannel::new(vec![]);
1360        let registry = create_test_registry();
1361        let executor = MockToolExecutor::no_tools();
1362
1363        let (tx, rx) = tokio::sync::watch::channel(MetricsSnapshot::default());
1364        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1365        let cid = memory.sqlite().create_conversation().await.unwrap();
1366
1367        let min_length = 10usize;
1368        let mut agent = Agent::new(provider, channel, registry, None, 5, executor)
1369            .with_metrics(tx)
1370            .with_memory(std::sync::Arc::new(memory), cid, 50, 5, 100);
1371        agent.memory_state.persistence.autosave_assistant = true;
1372        agent.memory_state.persistence.autosave_min_length = min_length;
1373
1374        // Exact boundary: len == min_length → embed path.
1375        let content_at_boundary = "A".repeat(min_length);
1376        assert_eq!(content_at_boundary.len(), min_length);
1377        agent
1378            .persist_message(Role::Assistant, &content_at_boundary, &[], false)
1379            .await;
1380
1381        // sqlite_message_count must be incremented regardless of embedding success.
1382        assert_eq!(rx.borrow().sqlite_message_count, 1);
1383    }
1384
1385    #[tokio::test]
1386    async fn persist_message_assistant_one_below_min_length_uses_save_only() {
1387        // content.len() == autosave_min_length - 1 → should_embed = false (below boundary).
1388        let provider = mock_provider(vec![]);
1389        let channel = MockChannel::new(vec![]);
1390        let registry = create_test_registry();
1391        let executor = MockToolExecutor::no_tools();
1392
1393        let (tx, rx) = tokio::sync::watch::channel(MetricsSnapshot::default());
1394        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1395        let cid = memory.sqlite().create_conversation().await.unwrap();
1396
1397        let min_length = 10usize;
1398        let mut agent = Agent::new(provider, channel, registry, None, 5, executor)
1399            .with_metrics(tx)
1400            .with_memory(std::sync::Arc::new(memory), cid, 50, 5, 100);
1401        agent.memory_state.persistence.autosave_assistant = true;
1402        agent.memory_state.persistence.autosave_min_length = min_length;
1403
1404        // One below boundary: len == min_length - 1 → save_only path, no embedding.
1405        let content_below_boundary = "A".repeat(min_length - 1);
1406        assert_eq!(content_below_boundary.len(), min_length - 1);
1407        agent
1408            .persist_message(Role::Assistant, &content_below_boundary, &[], false)
1409            .await;
1410
1411        let history = agent
1412            .memory_state
1413            .persistence
1414            .memory
1415            .as_ref()
1416            .unwrap()
1417            .sqlite()
1418            .load_history(cid, 50)
1419            .await
1420            .unwrap();
1421        assert_eq!(history.len(), 1, "message must still be saved");
1422        // save_only path does not embed.
1423        assert_eq!(rx.borrow().embeddings_generated, 0);
1424    }
1425
1426    #[tokio::test]
1427    async fn persist_message_increments_unsummarized_count() {
1428        let provider = mock_provider(vec![]);
1429        let channel = MockChannel::new(vec![]);
1430        let registry = create_test_registry();
1431        let executor = MockToolExecutor::no_tools();
1432
1433        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1434        let cid = memory.sqlite().create_conversation().await.unwrap();
1435
1436        // threshold=100 ensures no summarization is triggered
1437        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
1438            std::sync::Arc::new(memory),
1439            cid,
1440            50,
1441            5,
1442            100,
1443        );
1444
1445        assert_eq!(agent.memory_state.persistence.unsummarized_count, 0);
1446
1447        agent.persist_message(Role::User, "first", &[], false).await;
1448        assert_eq!(agent.memory_state.persistence.unsummarized_count, 1);
1449
1450        agent
1451            .persist_message(Role::User, "second", &[], false)
1452            .await;
1453        assert_eq!(agent.memory_state.persistence.unsummarized_count, 2);
1454    }
1455
1456    #[tokio::test]
1457    async fn check_summarization_resets_counter_on_success() {
1458        let provider = mock_provider(vec![]);
1459        let channel = MockChannel::new(vec![]);
1460        let registry = create_test_registry();
1461        let executor = MockToolExecutor::no_tools();
1462
1463        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1464        let cid = memory.sqlite().create_conversation().await.unwrap();
1465
1466        // threshold=1 so the second persist triggers summarization check (count > threshold)
1467        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
1468            std::sync::Arc::new(memory),
1469            cid,
1470            50,
1471            5,
1472            1,
1473        );
1474
1475        agent.persist_message(Role::User, "msg1", &[], false).await;
1476        agent.persist_message(Role::User, "msg2", &[], false).await;
1477
1478        // After summarization attempt (summarize returns Ok(None) since no messages qualify),
1479        // the counter is NOT reset to 0 — only reset on Ok(Some(_)).
1480        // This verifies check_summarization is called and the guard condition works.
1481        // unsummarized_count must be >= 2 before any summarization or 0 if summarization ran.
1482        assert!(agent.memory_state.persistence.unsummarized_count <= 2);
1483    }
1484
1485    #[tokio::test]
1486    async fn unsummarized_count_not_incremented_without_memory() {
1487        let provider = mock_provider(vec![]);
1488        let channel = MockChannel::new(vec![]);
1489        let registry = create_test_registry();
1490        let executor = MockToolExecutor::no_tools();
1491        let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
1492
1493        agent.persist_message(Role::User, "hello", &[], false).await;
1494        // No memory configured — persist_message returns early, counter must stay 0.
1495        assert_eq!(agent.memory_state.persistence.unsummarized_count, 0);
1496    }
1497
1498    // R-CRIT-01: unit tests for enqueue_graph_extraction_task guard conditions.
1499    mod graph_extraction_guards {
1500        use super::*;
1501        use crate::config::GraphConfig;
1502        use zeph_llm::provider::MessageMetadata;
1503        use zeph_memory::graph::GraphStore;
1504
1505        fn enabled_graph_config() -> GraphConfig {
1506            GraphConfig {
1507                enabled: true,
1508                ..GraphConfig::default()
1509            }
1510        }
1511
1512        async fn agent_with_graph(
1513            provider: &AnyProvider,
1514            config: GraphConfig,
1515        ) -> Agent<MockChannel> {
1516            let memory =
1517                test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1518            let cid = memory.sqlite().create_conversation().await.unwrap();
1519            Agent::new(
1520                provider.clone(),
1521                MockChannel::new(vec![]),
1522                create_test_registry(),
1523                None,
1524                5,
1525                MockToolExecutor::no_tools(),
1526            )
1527            .with_memory(std::sync::Arc::new(memory), cid, 50, 5, 100)
1528            .with_graph_config(config)
1529        }
1530
1531        #[tokio::test]
1532        async fn injection_flag_guard_skips_extraction() {
1533            // has_injection_flags=true → extraction must be skipped; no counter in graph_metadata.
1534            let provider = mock_provider(vec![]);
1535            let mut agent = agent_with_graph(&provider, enabled_graph_config()).await;
1536            let pool = agent
1537                .memory_state
1538                .persistence
1539                .memory
1540                .as_ref()
1541                .unwrap()
1542                .sqlite()
1543                .pool()
1544                .clone();
1545
1546            agent
1547                .enqueue_graph_extraction_task("I use Rust", true, false)
1548                .await;
1549
1550            // Give any accidental spawn time to settle.
1551            tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1552
1553            let store = GraphStore::new(pool);
1554            let count = store.get_metadata("extraction_count").await.unwrap();
1555            assert!(
1556                count.is_none(),
1557                "injection flag must prevent extraction_count from being written"
1558            );
1559        }
1560
1561        #[tokio::test]
1562        async fn disabled_config_guard_skips_extraction() {
1563            // graph.enabled=false → extraction must be skipped.
1564            let provider = mock_provider(vec![]);
1565            let disabled_cfg = GraphConfig {
1566                enabled: false,
1567                ..GraphConfig::default()
1568            };
1569            let mut agent = agent_with_graph(&provider, disabled_cfg).await;
1570            let pool = agent
1571                .memory_state
1572                .persistence
1573                .memory
1574                .as_ref()
1575                .unwrap()
1576                .sqlite()
1577                .pool()
1578                .clone();
1579
1580            agent
1581                .enqueue_graph_extraction_task("I use Rust", false, false)
1582                .await;
1583
1584            tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1585
1586            let store = GraphStore::new(pool);
1587            let count = store.get_metadata("extraction_count").await.unwrap();
1588            assert!(
1589                count.is_none(),
1590                "disabled graph config must prevent extraction"
1591            );
1592        }
1593
1594        #[tokio::test]
1595        async fn happy_path_fires_extraction() {
1596            // With enabled config and no injection flags, extraction is spawned.
1597            // MockProvider returns None (no entities), but the counter must be incremented.
1598            let provider = mock_provider(vec![]);
1599            let mut agent = agent_with_graph(&provider, enabled_graph_config()).await;
1600            let pool = agent
1601                .memory_state
1602                .persistence
1603                .memory
1604                .as_ref()
1605                .unwrap()
1606                .sqlite()
1607                .pool()
1608                .clone();
1609
1610            agent
1611                .enqueue_graph_extraction_task("I use Rust for systems programming", false, false)
1612                .await;
1613
1614            // Wait for the spawned task to complete.
1615            tokio::time::sleep(std::time::Duration::from_millis(200)).await;
1616
1617            let store = GraphStore::new(pool);
1618            let count = store.get_metadata("extraction_count").await.unwrap();
1619            assert!(
1620                count.is_some(),
1621                "happy-path extraction must increment extraction_count"
1622            );
1623        }
1624
1625        #[tokio::test]
1626        async fn tool_result_parts_guard_skips_extraction() {
1627            // FIX-1 regression: has_tool_result_parts=true → extraction must be skipped.
1628            // Tool result messages contain raw structured output (TOML, JSON, code) — not
1629            // conversational content. Extracting entities from them produces graph noise.
1630            let provider = mock_provider(vec![]);
1631            let mut agent = agent_with_graph(&provider, enabled_graph_config()).await;
1632            let pool = agent
1633                .memory_state
1634                .persistence
1635                .memory
1636                .as_ref()
1637                .unwrap()
1638                .sqlite()
1639                .pool()
1640                .clone();
1641
1642            agent
1643                .enqueue_graph_extraction_task(
1644                    "[tool_result: abc123]\nprovider_type = \"claude\"\nallowed_commands = []",
1645                    false,
1646                    true, // has_tool_result_parts
1647                )
1648                .await;
1649
1650            tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1651
1652            let store = GraphStore::new(pool);
1653            let count = store.get_metadata("extraction_count").await.unwrap();
1654            assert!(
1655                count.is_none(),
1656                "tool result message must not trigger graph extraction"
1657            );
1658        }
1659
1660        #[tokio::test]
1661        async fn context_filter_excludes_tool_result_messages() {
1662            // FIX-2: context_messages must not include tool result user messages.
1663            // When enqueue_graph_extraction_task collects context, it filters out
1664            // Role::User messages that contain ToolResult parts — only conversational
1665            // user messages are included as extraction context.
1666            //
1667            // This test verifies the guard fires: a tool result message alone is passed
1668            // (has_tool_result_parts=true) → extraction is skipped entirely, so context
1669            // filtering is not exercised. We verify FIX-2 by ensuring a prior tool result
1670            // message in agent.msg.messages is excluded when a subsequent conversational message
1671            // triggers extraction.
1672            let provider = mock_provider(vec![]);
1673            let mut agent = agent_with_graph(&provider, enabled_graph_config()).await;
1674
1675            // Add a tool result message to the agent's message history — this simulates
1676            // a tool call response that arrived before the current conversational turn.
1677            agent.msg.messages.push(Message {
1678                role: Role::User,
1679                content: "[tool_result: abc]\nprovider_type = \"openai\"".to_owned(),
1680                parts: vec![MessagePart::ToolResult {
1681                    tool_use_id: "abc".to_owned(),
1682                    content: "provider_type = \"openai\"".to_owned(),
1683                    is_error: false,
1684                }],
1685                metadata: MessageMetadata::default(),
1686            });
1687
1688            let pool = agent
1689                .memory_state
1690                .persistence
1691                .memory
1692                .as_ref()
1693                .unwrap()
1694                .sqlite()
1695                .pool()
1696                .clone();
1697
1698            // Trigger extraction for a conversational message (not a tool result).
1699            agent
1700                .enqueue_graph_extraction_task(
1701                    "I prefer Rust for systems programming",
1702                    false,
1703                    false,
1704                )
1705                .await;
1706
1707            tokio::time::sleep(std::time::Duration::from_millis(200)).await;
1708
1709            // Extraction must have fired (conversational message, no injection flags).
1710            let store = GraphStore::new(pool);
1711            let count = store.get_metadata("extraction_count").await.unwrap();
1712            assert!(
1713                count.is_some(),
1714                "conversational message must trigger extraction even with prior tool result in history"
1715            );
1716        }
1717    }
1718
1719    // R-PERS-01: unit tests for enqueue_persona_extraction_task guard conditions.
1720    mod persona_extraction_guards {
1721        use super::*;
1722        use zeph_config::PersonaConfig;
1723        use zeph_llm::provider::MessageMetadata;
1724
1725        fn enabled_persona_config() -> PersonaConfig {
1726            PersonaConfig {
1727                enabled: true,
1728                min_messages: 1,
1729                ..PersonaConfig::default()
1730            }
1731        }
1732
1733        async fn agent_with_persona(
1734            provider: &AnyProvider,
1735            config: PersonaConfig,
1736        ) -> Agent<MockChannel> {
1737            let memory =
1738                test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1739            let cid = memory.sqlite().create_conversation().await.unwrap();
1740            let mut agent = Agent::new(
1741                provider.clone(),
1742                MockChannel::new(vec![]),
1743                create_test_registry(),
1744                None,
1745                5,
1746                MockToolExecutor::no_tools(),
1747            )
1748            .with_memory(std::sync::Arc::new(memory), cid, 50, 5, 100);
1749            agent.memory_state.extraction.persona_config = config;
1750            agent
1751        }
1752
1753        #[tokio::test]
1754        async fn disabled_config_skips_spawn() {
1755            // persona.enabled=false → nothing is spawned; persona_memory stays empty.
1756            let provider = mock_provider(vec![]);
1757            let mut agent = agent_with_persona(
1758                &provider,
1759                PersonaConfig {
1760                    enabled: false,
1761                    ..PersonaConfig::default()
1762                },
1763            )
1764            .await;
1765
1766            // Inject a user message so message count is above threshold.
1767            agent.msg.messages.push(zeph_llm::provider::Message {
1768                role: Role::User,
1769                content: "I prefer Rust for systems programming".to_owned(),
1770                parts: vec![],
1771                metadata: MessageMetadata::default(),
1772            });
1773
1774            agent.enqueue_persona_extraction_task();
1775
1776            let store = agent
1777                .memory_state
1778                .persistence
1779                .memory
1780                .as_ref()
1781                .unwrap()
1782                .sqlite()
1783                .clone();
1784            let count = store.count_persona_facts().await.unwrap();
1785            assert_eq!(count, 0, "disabled persona config must not write any facts");
1786        }
1787
1788        #[tokio::test]
1789        async fn below_min_messages_skips_spawn() {
1790            // min_messages=3 but only 2 user messages → should skip.
1791            let provider = mock_provider(vec![]);
1792            let mut agent = agent_with_persona(
1793                &provider,
1794                PersonaConfig {
1795                    enabled: true,
1796                    min_messages: 3,
1797                    ..PersonaConfig::default()
1798                },
1799            )
1800            .await;
1801
1802            for text in ["I use Rust", "I prefer async code"] {
1803                agent.msg.messages.push(zeph_llm::provider::Message {
1804                    role: Role::User,
1805                    content: text.to_owned(),
1806                    parts: vec![],
1807                    metadata: MessageMetadata::default(),
1808                });
1809            }
1810
1811            agent.enqueue_persona_extraction_task();
1812
1813            let store = agent
1814                .memory_state
1815                .persistence
1816                .memory
1817                .as_ref()
1818                .unwrap()
1819                .sqlite()
1820                .clone();
1821            let count = store.count_persona_facts().await.unwrap();
1822            assert_eq!(
1823                count, 0,
1824                "below min_messages threshold must not trigger extraction"
1825            );
1826        }
1827
1828        #[tokio::test]
1829        async fn no_memory_skips_spawn() {
1830            // memory=None → must exit early without panic.
1831            let provider = mock_provider(vec![]);
1832            let channel = MockChannel::new(vec![]);
1833            let registry = create_test_registry();
1834            let executor = MockToolExecutor::no_tools();
1835            let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
1836            agent.memory_state.extraction.persona_config = enabled_persona_config();
1837            agent.msg.messages.push(zeph_llm::provider::Message {
1838                role: Role::User,
1839                content: "I like Rust".to_owned(),
1840                parts: vec![],
1841                metadata: MessageMetadata::default(),
1842            });
1843
1844            // Must not panic even without memory.
1845            agent.enqueue_persona_extraction_task();
1846        }
1847
1848        #[tokio::test]
1849        async fn enabled_enough_messages_spawns_extraction() {
1850            // enabled=true, min_messages=1, self-referential message → extraction runs eagerly
1851            // (not fire-and-forget) and chat() is called on the provider, verified via MockProvider.
1852            use zeph_llm::mock::MockProvider;
1853            let (mock, recorded) = MockProvider::default().with_recording();
1854            let provider = AnyProvider::Mock(mock);
1855            let mut agent = agent_with_persona(&provider, enabled_persona_config()).await;
1856
1857            agent.msg.messages.push(zeph_llm::provider::Message {
1858                role: Role::User,
1859                content: "I prefer Rust for systems programming".to_owned(),
1860                parts: vec![],
1861                metadata: MessageMetadata::default(),
1862            });
1863
1864            agent.enqueue_persona_extraction_task();
1865
1866            // Persona extraction runs via BackgroundSupervisor. Wait for tasks to complete.
1867            agent.lifecycle.supervisor.join_all_for_test().await;
1868
1869            let calls = recorded.lock().unwrap();
1870            assert!(
1871                !calls.is_empty(),
1872                "happy-path: provider.chat() must be called when extraction completes"
1873            );
1874        }
1875
1876        #[tokio::test]
1877        async fn messages_capped_at_eight() {
1878            // More than 8 user messages → only 8 are passed to extraction.
1879            // Each message contains "I" so self-referential gate passes.
1880            use zeph_llm::mock::MockProvider;
1881            let (mock, recorded) = MockProvider::default().with_recording();
1882            let provider = AnyProvider::Mock(mock);
1883            let mut agent = agent_with_persona(&provider, enabled_persona_config()).await;
1884
1885            for i in 0..12u32 {
1886                agent.msg.messages.push(zeph_llm::provider::Message {
1887                    role: Role::User,
1888                    content: format!("I like message {i}"),
1889                    parts: vec![],
1890                    metadata: MessageMetadata::default(),
1891                });
1892            }
1893
1894            agent.enqueue_persona_extraction_task();
1895
1896            // Allow the background task to complete before asserting.
1897            agent.lifecycle.supervisor.join_all_for_test().await;
1898
1899            // Verify extraction ran (provider was called).
1900            let calls = recorded.lock().unwrap();
1901            assert!(
1902                !calls.is_empty(),
1903                "extraction must run when enough messages present"
1904            );
1905            // Verify the prompt sent to the provider does not contain messages beyond the 8th.
1906            let prompt = &calls[0];
1907            let user_text = prompt
1908                .iter()
1909                .filter(|m| m.role == Role::User)
1910                .map(|m| m.content.as_str())
1911                .collect::<Vec<_>>()
1912                .join(" ");
1913            // Messages 8..11 ("I like message 8".."I like message 11") must not appear.
1914            assert!(
1915                !user_text.contains("I like message 8"),
1916                "message index 8 must be excluded from extraction input"
1917            );
1918        }
1919
1920        #[test]
1921        fn long_message_truncated_at_char_boundary() {
1922            // Directly verify the per-message truncation logic applied in
1923            // enqueue_persona_extraction_task: a content > 2048 bytes must be capped
1924            // to exactly floor_char_boundary(2048).
1925            let long_content = "x".repeat(3000);
1926            let truncated = if long_content.len() > 2048 {
1927                long_content[..long_content.floor_char_boundary(2048)].to_owned()
1928            } else {
1929                long_content.clone()
1930            };
1931            assert_eq!(
1932                truncated.len(),
1933                2048,
1934                "ASCII content must be truncated to exactly 2048 bytes"
1935            );
1936
1937            // Multi-byte boundary: build a string whose char boundary falls before 2048.
1938            let multi = "é".repeat(1500); // each 'é' is 2 bytes → 3000 bytes total
1939            let truncated_multi = if multi.len() > 2048 {
1940                multi[..multi.floor_char_boundary(2048)].to_owned()
1941            } else {
1942                multi.clone()
1943            };
1944            assert!(
1945                truncated_multi.len() <= 2048,
1946                "multi-byte content must not exceed 2048 bytes"
1947            );
1948            assert!(truncated_multi.is_char_boundary(truncated_multi.len()));
1949        }
1950    }
1951
1952    #[tokio::test]
1953    async fn persist_message_user_always_embeds_regardless_of_autosave_flag() {
1954        let provider = mock_provider(vec![]);
1955        let channel = MockChannel::new(vec![]);
1956        let registry = create_test_registry();
1957        let executor = MockToolExecutor::no_tools();
1958
1959        let (tx, rx) = tokio::sync::watch::channel(MetricsSnapshot::default());
1960        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1961        let cid = memory.sqlite().create_conversation().await.unwrap();
1962
1963        // autosave_assistant=false — but User role always takes embedding path
1964        let mut agent = Agent::new(provider, channel, registry, None, 5, executor)
1965            .with_metrics(tx)
1966            .with_memory(std::sync::Arc::new(memory), cid, 50, 5, 100);
1967        agent.memory_state.persistence.autosave_assistant = false;
1968        agent.memory_state.persistence.autosave_min_length = 20;
1969
1970        let long_user_msg = "A".repeat(100);
1971        agent
1972            .persist_message(Role::User, &long_user_msg, &[], false)
1973            .await;
1974
1975        let history = agent
1976            .memory_state
1977            .persistence
1978            .memory
1979            .as_ref()
1980            .unwrap()
1981            .sqlite()
1982            .load_history(cid, 50)
1983            .await
1984            .unwrap();
1985        assert_eq!(history.len(), 1, "user message must be saved");
1986        // User messages go through remember_with_parts (embedding path).
1987        // sqlite_message_count must increment regardless of Qdrant availability.
1988        assert_eq!(rx.borrow().sqlite_message_count, 1);
1989    }
1990
1991    // Round-trip tests: verify that persist_message saves the correct parts and they
1992    // are restored correctly by load_history.
1993
1994    #[tokio::test]
1995    async fn persist_message_saves_correct_tool_use_parts() {
1996        use zeph_llm::provider::MessagePart;
1997
1998        let provider = mock_provider(vec![]);
1999        let channel = MockChannel::new(vec![]);
2000        let registry = create_test_registry();
2001        let executor = MockToolExecutor::no_tools();
2002
2003        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2004        let cid = memory.sqlite().create_conversation().await.unwrap();
2005
2006        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2007            std::sync::Arc::new(memory),
2008            cid,
2009            50,
2010            5,
2011            100,
2012        );
2013
2014        let parts = vec![MessagePart::ToolUse {
2015            id: "call_abc123".to_string(),
2016            name: "read_file".to_string(),
2017            input: serde_json::json!({"path": "/tmp/test.txt"}),
2018        }];
2019        let content = "[tool_use: read_file(call_abc123)]";
2020
2021        agent
2022            .persist_message(Role::Assistant, content, &parts, false)
2023            .await;
2024
2025        let history = agent
2026            .memory_state
2027            .persistence
2028            .memory
2029            .as_ref()
2030            .unwrap()
2031            .sqlite()
2032            .load_history(cid, 50)
2033            .await
2034            .unwrap();
2035
2036        assert_eq!(history.len(), 1);
2037        assert_eq!(history[0].role, Role::Assistant);
2038        assert_eq!(history[0].content, content);
2039        assert_eq!(history[0].parts.len(), 1);
2040        match &history[0].parts[0] {
2041            MessagePart::ToolUse { id, name, .. } => {
2042                assert_eq!(id, "call_abc123");
2043                assert_eq!(name, "read_file");
2044            }
2045            other => panic!("expected ToolUse part, got {other:?}"),
2046        }
2047        // Regression guard: assistant message must NOT have ToolResult parts
2048        assert!(
2049            !history[0]
2050                .parts
2051                .iter()
2052                .any(|p| matches!(p, MessagePart::ToolResult { .. })),
2053            "assistant message must not contain ToolResult parts"
2054        );
2055    }
2056
2057    #[tokio::test]
2058    async fn persist_message_saves_correct_tool_result_parts() {
2059        use zeph_llm::provider::MessagePart;
2060
2061        let provider = mock_provider(vec![]);
2062        let channel = MockChannel::new(vec![]);
2063        let registry = create_test_registry();
2064        let executor = MockToolExecutor::no_tools();
2065
2066        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2067        let cid = memory.sqlite().create_conversation().await.unwrap();
2068
2069        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2070            std::sync::Arc::new(memory),
2071            cid,
2072            50,
2073            5,
2074            100,
2075        );
2076
2077        let parts = vec![MessagePart::ToolResult {
2078            tool_use_id: "call_abc123".to_string(),
2079            content: "file contents here".to_string(),
2080            is_error: false,
2081        }];
2082        let content = "[tool_result: call_abc123]\nfile contents here";
2083
2084        agent
2085            .persist_message(Role::User, content, &parts, false)
2086            .await;
2087
2088        let history = agent
2089            .memory_state
2090            .persistence
2091            .memory
2092            .as_ref()
2093            .unwrap()
2094            .sqlite()
2095            .load_history(cid, 50)
2096            .await
2097            .unwrap();
2098
2099        assert_eq!(history.len(), 1);
2100        assert_eq!(history[0].role, Role::User);
2101        assert_eq!(history[0].content, content);
2102        assert_eq!(history[0].parts.len(), 1);
2103        match &history[0].parts[0] {
2104            MessagePart::ToolResult {
2105                tool_use_id,
2106                content: result_content,
2107                is_error,
2108            } => {
2109                assert_eq!(tool_use_id, "call_abc123");
2110                assert_eq!(result_content, "file contents here");
2111                assert!(!is_error);
2112            }
2113            other => panic!("expected ToolResult part, got {other:?}"),
2114        }
2115        // Regression guard: user message with ToolResult must NOT have ToolUse parts
2116        assert!(
2117            !history[0]
2118                .parts
2119                .iter()
2120                .any(|p| matches!(p, MessagePart::ToolUse { .. })),
2121            "user ToolResult message must not contain ToolUse parts"
2122        );
2123    }
2124
2125    #[tokio::test]
2126    async fn persist_message_roundtrip_preserves_role_part_alignment() {
2127        use zeph_llm::provider::MessagePart;
2128
2129        let provider = mock_provider(vec![]);
2130        let channel = MockChannel::new(vec![]);
2131        let registry = create_test_registry();
2132        let executor = MockToolExecutor::no_tools();
2133
2134        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2135        let cid = memory.sqlite().create_conversation().await.unwrap();
2136
2137        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2138            std::sync::Arc::new(memory),
2139            cid,
2140            50,
2141            5,
2142            100,
2143        );
2144
2145        // Persist assistant message with ToolUse parts
2146        let assistant_parts = vec![MessagePart::ToolUse {
2147            id: "id_1".to_string(),
2148            name: "list_dir".to_string(),
2149            input: serde_json::json!({"path": "/tmp"}),
2150        }];
2151        agent
2152            .persist_message(
2153                Role::Assistant,
2154                "[tool_use: list_dir(id_1)]",
2155                &assistant_parts,
2156                false,
2157            )
2158            .await;
2159
2160        // Persist user message with ToolResult parts
2161        let user_parts = vec![MessagePart::ToolResult {
2162            tool_use_id: "id_1".to_string(),
2163            content: "file1.txt\nfile2.txt".to_string(),
2164            is_error: false,
2165        }];
2166        agent
2167            .persist_message(
2168                Role::User,
2169                "[tool_result: id_1]\nfile1.txt\nfile2.txt",
2170                &user_parts,
2171                false,
2172            )
2173            .await;
2174
2175        let history = agent
2176            .memory_state
2177            .persistence
2178            .memory
2179            .as_ref()
2180            .unwrap()
2181            .sqlite()
2182            .load_history(cid, 50)
2183            .await
2184            .unwrap();
2185
2186        assert_eq!(history.len(), 2);
2187
2188        // First message: assistant + ToolUse
2189        assert_eq!(history[0].role, Role::Assistant);
2190        assert_eq!(history[0].content, "[tool_use: list_dir(id_1)]");
2191        assert!(
2192            matches!(&history[0].parts[0], MessagePart::ToolUse { id, .. } if id == "id_1"),
2193            "first message must be assistant ToolUse"
2194        );
2195
2196        // Second message: user + ToolResult
2197        assert_eq!(history[1].role, Role::User);
2198        assert_eq!(
2199            history[1].content,
2200            "[tool_result: id_1]\nfile1.txt\nfile2.txt"
2201        );
2202        assert!(
2203            matches!(&history[1].parts[0], MessagePart::ToolResult { tool_use_id, .. } if tool_use_id == "id_1"),
2204            "second message must be user ToolResult"
2205        );
2206
2207        // Cross-role regression guard: no swapped parts
2208        assert!(
2209            !history[0]
2210                .parts
2211                .iter()
2212                .any(|p| matches!(p, MessagePart::ToolResult { .. })),
2213            "assistant message must not have ToolResult parts"
2214        );
2215        assert!(
2216            !history[1]
2217                .parts
2218                .iter()
2219                .any(|p| matches!(p, MessagePart::ToolUse { .. })),
2220            "user message must not have ToolUse parts"
2221        );
2222    }
2223
2224    #[tokio::test]
2225    async fn persist_message_saves_correct_tool_output_parts() {
2226        use zeph_llm::provider::MessagePart;
2227
2228        let provider = mock_provider(vec![]);
2229        let channel = MockChannel::new(vec![]);
2230        let registry = create_test_registry();
2231        let executor = MockToolExecutor::no_tools();
2232
2233        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2234        let cid = memory.sqlite().create_conversation().await.unwrap();
2235
2236        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2237            std::sync::Arc::new(memory),
2238            cid,
2239            50,
2240            5,
2241            100,
2242        );
2243
2244        let parts = vec![MessagePart::ToolOutput {
2245            tool_name: "shell".into(),
2246            body: "hello from shell".to_string(),
2247            compacted_at: None,
2248        }];
2249        let content = "[tool: shell]\nhello from shell";
2250
2251        agent
2252            .persist_message(Role::User, content, &parts, false)
2253            .await;
2254
2255        let history = agent
2256            .memory_state
2257            .persistence
2258            .memory
2259            .as_ref()
2260            .unwrap()
2261            .sqlite()
2262            .load_history(cid, 50)
2263            .await
2264            .unwrap();
2265
2266        assert_eq!(history.len(), 1);
2267        assert_eq!(history[0].role, Role::User);
2268        assert_eq!(history[0].content, content);
2269        assert_eq!(history[0].parts.len(), 1);
2270        match &history[0].parts[0] {
2271            MessagePart::ToolOutput {
2272                tool_name,
2273                body,
2274                compacted_at,
2275            } => {
2276                assert_eq!(tool_name, "shell");
2277                assert_eq!(body, "hello from shell");
2278                assert!(compacted_at.is_none());
2279            }
2280            other => panic!("expected ToolOutput part, got {other:?}"),
2281        }
2282    }
2283
2284    // --- sanitize_tool_pairs unit tests ---
2285
2286    #[tokio::test]
2287    async fn load_history_removes_trailing_orphan_tool_use() {
2288        use zeph_llm::provider::MessagePart;
2289
2290        let provider = mock_provider(vec![]);
2291        let channel = MockChannel::new(vec![]);
2292        let registry = create_test_registry();
2293        let executor = MockToolExecutor::no_tools();
2294
2295        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2296        let cid = memory.sqlite().create_conversation().await.unwrap();
2297        let sqlite = memory.sqlite();
2298
2299        // user message (normal)
2300        sqlite
2301            .save_message(cid, "user", "do something with a tool")
2302            .await
2303            .unwrap();
2304
2305        // assistant message with ToolUse parts — no following tool_result (orphan)
2306        let parts = serde_json::to_string(&[MessagePart::ToolUse {
2307            id: "call_orphan".to_string(),
2308            name: "shell".to_string(),
2309            input: serde_json::json!({"command": "ls"}),
2310        }])
2311        .unwrap();
2312        sqlite
2313            .save_message_with_parts(cid, "assistant", "[tool_use: shell(call_orphan)]", &parts)
2314            .await
2315            .unwrap();
2316
2317        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2318            std::sync::Arc::new(memory),
2319            cid,
2320            50,
2321            5,
2322            100,
2323        );
2324
2325        let messages_before = agent.msg.messages.len();
2326        agent.load_history().await.unwrap();
2327
2328        // Only the user message should be loaded; orphaned assistant tool_use removed.
2329        assert_eq!(
2330            agent.msg.messages.len(),
2331            messages_before + 1,
2332            "orphaned trailing tool_use must be removed"
2333        );
2334        assert_eq!(agent.msg.messages.last().unwrap().role, Role::User);
2335    }
2336
2337    #[tokio::test]
2338    async fn load_history_removes_leading_orphan_tool_result() {
2339        use zeph_llm::provider::MessagePart;
2340
2341        let provider = mock_provider(vec![]);
2342        let channel = MockChannel::new(vec![]);
2343        let registry = create_test_registry();
2344        let executor = MockToolExecutor::no_tools();
2345
2346        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2347        let cid = memory.sqlite().create_conversation().await.unwrap();
2348        let sqlite = memory.sqlite();
2349
2350        // Leading orphan: user message with ToolResult but no preceding tool_use
2351        let result_parts = serde_json::to_string(&[MessagePart::ToolResult {
2352            tool_use_id: "call_missing".to_string(),
2353            content: "result data".to_string(),
2354            is_error: false,
2355        }])
2356        .unwrap();
2357        sqlite
2358            .save_message_with_parts(
2359                cid,
2360                "user",
2361                "[tool_result: call_missing]\nresult data",
2362                &result_parts,
2363            )
2364            .await
2365            .unwrap();
2366
2367        // A valid assistant reply after the orphan
2368        sqlite
2369            .save_message(cid, "assistant", "here is my response")
2370            .await
2371            .unwrap();
2372
2373        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2374            std::sync::Arc::new(memory),
2375            cid,
2376            50,
2377            5,
2378            100,
2379        );
2380
2381        let messages_before = agent.msg.messages.len();
2382        agent.load_history().await.unwrap();
2383
2384        // Orphaned leading tool_result removed; only assistant message kept.
2385        assert_eq!(
2386            agent.msg.messages.len(),
2387            messages_before + 1,
2388            "orphaned leading tool_result must be removed"
2389        );
2390        assert_eq!(agent.msg.messages.last().unwrap().role, Role::Assistant);
2391    }
2392
2393    #[tokio::test]
2394    async fn load_history_preserves_complete_tool_pairs() {
2395        use zeph_llm::provider::MessagePart;
2396
2397        let provider = mock_provider(vec![]);
2398        let channel = MockChannel::new(vec![]);
2399        let registry = create_test_registry();
2400        let executor = MockToolExecutor::no_tools();
2401
2402        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2403        let cid = memory.sqlite().create_conversation().await.unwrap();
2404        let sqlite = memory.sqlite();
2405
2406        // Complete tool_use / tool_result pair
2407        let use_parts = serde_json::to_string(&[MessagePart::ToolUse {
2408            id: "call_ok".to_string(),
2409            name: "shell".to_string(),
2410            input: serde_json::json!({"command": "pwd"}),
2411        }])
2412        .unwrap();
2413        sqlite
2414            .save_message_with_parts(cid, "assistant", "[tool_use: shell(call_ok)]", &use_parts)
2415            .await
2416            .unwrap();
2417
2418        let result_parts = serde_json::to_string(&[MessagePart::ToolResult {
2419            tool_use_id: "call_ok".to_string(),
2420            content: "/home/user".to_string(),
2421            is_error: false,
2422        }])
2423        .unwrap();
2424        sqlite
2425            .save_message_with_parts(
2426                cid,
2427                "user",
2428                "[tool_result: call_ok]\n/home/user",
2429                &result_parts,
2430            )
2431            .await
2432            .unwrap();
2433
2434        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2435            std::sync::Arc::new(memory),
2436            cid,
2437            50,
2438            5,
2439            100,
2440        );
2441
2442        let messages_before = agent.msg.messages.len();
2443        agent.load_history().await.unwrap();
2444
2445        // Both messages must be preserved.
2446        assert_eq!(
2447            agent.msg.messages.len(),
2448            messages_before + 2,
2449            "complete tool_use/tool_result pair must be preserved"
2450        );
2451        assert_eq!(agent.msg.messages[messages_before].role, Role::Assistant);
2452        assert_eq!(agent.msg.messages[messages_before + 1].role, Role::User);
2453    }
2454
2455    #[tokio::test]
2456    async fn load_history_handles_multiple_trailing_orphans() {
2457        use zeph_llm::provider::MessagePart;
2458
2459        let provider = mock_provider(vec![]);
2460        let channel = MockChannel::new(vec![]);
2461        let registry = create_test_registry();
2462        let executor = MockToolExecutor::no_tools();
2463
2464        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2465        let cid = memory.sqlite().create_conversation().await.unwrap();
2466        let sqlite = memory.sqlite();
2467
2468        // Normal user message
2469        sqlite.save_message(cid, "user", "start").await.unwrap();
2470
2471        // First orphaned tool_use
2472        let parts1 = serde_json::to_string(&[MessagePart::ToolUse {
2473            id: "call_1".to_string(),
2474            name: "shell".to_string(),
2475            input: serde_json::json!({}),
2476        }])
2477        .unwrap();
2478        sqlite
2479            .save_message_with_parts(cid, "assistant", "[tool_use: shell(call_1)]", &parts1)
2480            .await
2481            .unwrap();
2482
2483        // Second orphaned tool_use (consecutive, no tool_result between them)
2484        let parts2 = serde_json::to_string(&[MessagePart::ToolUse {
2485            id: "call_2".to_string(),
2486            name: "read_file".to_string(),
2487            input: serde_json::json!({}),
2488        }])
2489        .unwrap();
2490        sqlite
2491            .save_message_with_parts(cid, "assistant", "[tool_use: read_file(call_2)]", &parts2)
2492            .await
2493            .unwrap();
2494
2495        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2496            std::sync::Arc::new(memory),
2497            cid,
2498            50,
2499            5,
2500            100,
2501        );
2502
2503        let messages_before = agent.msg.messages.len();
2504        agent.load_history().await.unwrap();
2505
2506        // Both orphaned tool_use messages removed; only the user message kept.
2507        assert_eq!(
2508            agent.msg.messages.len(),
2509            messages_before + 1,
2510            "all trailing orphaned tool_use messages must be removed"
2511        );
2512        assert_eq!(agent.msg.messages.last().unwrap().role, Role::User);
2513    }
2514
2515    #[tokio::test]
2516    async fn load_history_no_tool_messages_unchanged() {
2517        let provider = mock_provider(vec![]);
2518        let channel = MockChannel::new(vec![]);
2519        let registry = create_test_registry();
2520        let executor = MockToolExecutor::no_tools();
2521
2522        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2523        let cid = memory.sqlite().create_conversation().await.unwrap();
2524        let sqlite = memory.sqlite();
2525
2526        sqlite.save_message(cid, "user", "hello").await.unwrap();
2527        sqlite
2528            .save_message(cid, "assistant", "hi there")
2529            .await
2530            .unwrap();
2531        sqlite
2532            .save_message(cid, "user", "how are you?")
2533            .await
2534            .unwrap();
2535
2536        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2537            std::sync::Arc::new(memory),
2538            cid,
2539            50,
2540            5,
2541            100,
2542        );
2543
2544        let messages_before = agent.msg.messages.len();
2545        agent.load_history().await.unwrap();
2546
2547        // All three plain messages must be preserved.
2548        assert_eq!(
2549            agent.msg.messages.len(),
2550            messages_before + 3,
2551            "plain messages without tool parts must pass through unchanged"
2552        );
2553    }
2554
2555    #[tokio::test]
2556    async fn load_history_removes_both_leading_and_trailing_orphans() {
2557        use zeph_llm::provider::MessagePart;
2558
2559        let provider = mock_provider(vec![]);
2560        let channel = MockChannel::new(vec![]);
2561        let registry = create_test_registry();
2562        let executor = MockToolExecutor::no_tools();
2563
2564        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2565        let cid = memory.sqlite().create_conversation().await.unwrap();
2566        let sqlite = memory.sqlite();
2567
2568        // Leading orphan: user message with ToolResult, no preceding tool_use
2569        let result_parts = serde_json::to_string(&[MessagePart::ToolResult {
2570            tool_use_id: "call_leading".to_string(),
2571            content: "orphaned result".to_string(),
2572            is_error: false,
2573        }])
2574        .unwrap();
2575        sqlite
2576            .save_message_with_parts(
2577                cid,
2578                "user",
2579                "[tool_result: call_leading]\norphaned result",
2580                &result_parts,
2581            )
2582            .await
2583            .unwrap();
2584
2585        // Valid middle messages
2586        sqlite
2587            .save_message(cid, "user", "what is 2+2?")
2588            .await
2589            .unwrap();
2590        sqlite.save_message(cid, "assistant", "4").await.unwrap();
2591
2592        // Trailing orphan: assistant message with ToolUse, no following tool_result
2593        let use_parts = serde_json::to_string(&[MessagePart::ToolUse {
2594            id: "call_trailing".to_string(),
2595            name: "shell".to_string(),
2596            input: serde_json::json!({"command": "date"}),
2597        }])
2598        .unwrap();
2599        sqlite
2600            .save_message_with_parts(
2601                cid,
2602                "assistant",
2603                "[tool_use: shell(call_trailing)]",
2604                &use_parts,
2605            )
2606            .await
2607            .unwrap();
2608
2609        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2610            std::sync::Arc::new(memory),
2611            cid,
2612            50,
2613            5,
2614            100,
2615        );
2616
2617        let messages_before = agent.msg.messages.len();
2618        agent.load_history().await.unwrap();
2619
2620        // Both orphans removed; only the 2 valid middle messages kept.
2621        assert_eq!(
2622            agent.msg.messages.len(),
2623            messages_before + 2,
2624            "both leading and trailing orphans must be removed"
2625        );
2626        assert_eq!(agent.msg.messages[messages_before].role, Role::User);
2627        assert_eq!(agent.msg.messages[messages_before].content, "what is 2+2?");
2628        assert_eq!(
2629            agent.msg.messages[messages_before + 1].role,
2630            Role::Assistant
2631        );
2632        assert_eq!(agent.msg.messages[messages_before + 1].content, "4");
2633    }
2634
2635    /// RC1 regression: mid-history assistant[`ToolUse`] without a following user[`ToolResult`]
2636    /// must have its `ToolUse` parts stripped (text preserved). The message count stays the same
2637    /// because the assistant message has a text content fallback; only `ToolUse` parts are
2638    /// removed.
2639    #[tokio::test]
2640    async fn sanitize_tool_pairs_strips_mid_history_orphan_tool_use() {
2641        use zeph_llm::provider::MessagePart;
2642
2643        let provider = mock_provider(vec![]);
2644        let channel = MockChannel::new(vec![]);
2645        let registry = create_test_registry();
2646        let executor = MockToolExecutor::no_tools();
2647
2648        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2649        let cid = memory.sqlite().create_conversation().await.unwrap();
2650        let sqlite = memory.sqlite();
2651
2652        // Normal first exchange.
2653        sqlite
2654            .save_message(cid, "user", "first question")
2655            .await
2656            .unwrap();
2657        sqlite
2658            .save_message(cid, "assistant", "first answer")
2659            .await
2660            .unwrap();
2661
2662        // Mid-history orphan: assistant with ToolUse but NO following ToolResult user message.
2663        // This models the compaction-split scenario (RC2) where replace_conversation hid the
2664        // user[ToolResult] but left the assistant[ToolUse] visible.
2665        let use_parts = serde_json::to_string(&[
2666            MessagePart::ToolUse {
2667                id: "call_mid_1".to_string(),
2668                name: "shell".to_string(),
2669                input: serde_json::json!({"command": "ls"}),
2670            },
2671            MessagePart::Text {
2672                text: "Let me check the files.".to_string(),
2673            },
2674        ])
2675        .unwrap();
2676        sqlite
2677            .save_message_with_parts(cid, "assistant", "Let me check the files.", &use_parts)
2678            .await
2679            .unwrap();
2680
2681        // Another normal exchange after the orphan.
2682        sqlite
2683            .save_message(cid, "user", "second question")
2684            .await
2685            .unwrap();
2686        sqlite
2687            .save_message(cid, "assistant", "second answer")
2688            .await
2689            .unwrap();
2690
2691        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2692            std::sync::Arc::new(memory),
2693            cid,
2694            50,
2695            5,
2696            100,
2697        );
2698
2699        let messages_before = agent.msg.messages.len();
2700        agent.load_history().await.unwrap();
2701
2702        // All 5 messages remain (orphan message kept because it has text), but the orphaned
2703        // message must have its ToolUse parts stripped.
2704        assert_eq!(
2705            agent.msg.messages.len(),
2706            messages_before + 5,
2707            "message count must be 5 (orphan message kept — has text content)"
2708        );
2709
2710        // The orphaned assistant message (index 2 in the loaded slice) must have no ToolUse parts.
2711        let orphan = &agent.msg.messages[messages_before + 2];
2712        assert_eq!(orphan.role, Role::Assistant);
2713        assert!(
2714            !orphan
2715                .parts
2716                .iter()
2717                .any(|p| matches!(p, MessagePart::ToolUse { .. })),
2718            "orphaned ToolUse parts must be stripped from mid-history message"
2719        );
2720        // Text part must be preserved.
2721        assert!(
2722            orphan.parts.iter().any(
2723                |p| matches!(p, MessagePart::Text { text } if text == "Let me check the files.")
2724            ),
2725            "text content of orphaned assistant message must be preserved"
2726        );
2727    }
2728
2729    /// RC3 regression: a user message with empty `content` but non-empty `parts` (`ToolResult`)
2730    /// must NOT be skipped by `load_history`. Previously the empty-content check dropped these
2731    /// messages before `sanitize_tool_pairs` ran, leaving the preceding assistant `ToolUse`
2732    /// orphaned.
2733    #[tokio::test]
2734    async fn load_history_keeps_tool_only_user_message() {
2735        use zeph_llm::provider::MessagePart;
2736
2737        let provider = mock_provider(vec![]);
2738        let channel = MockChannel::new(vec![]);
2739        let registry = create_test_registry();
2740        let executor = MockToolExecutor::no_tools();
2741
2742        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2743        let cid = memory.sqlite().create_conversation().await.unwrap();
2744        let sqlite = memory.sqlite();
2745
2746        // Assistant sends a ToolUse.
2747        let use_parts = serde_json::to_string(&[MessagePart::ToolUse {
2748            id: "call_rc3".to_string(),
2749            name: "memory_save".to_string(),
2750            input: serde_json::json!({"content": "something"}),
2751        }])
2752        .unwrap();
2753        sqlite
2754            .save_message_with_parts(cid, "assistant", "[tool_use: memory_save]", &use_parts)
2755            .await
2756            .unwrap();
2757
2758        // User message has empty text content but carries ToolResult in parts — native tool pattern.
2759        let result_parts = serde_json::to_string(&[MessagePart::ToolResult {
2760            tool_use_id: "call_rc3".to_string(),
2761            content: "saved".to_string(),
2762            is_error: false,
2763        }])
2764        .unwrap();
2765        sqlite
2766            .save_message_with_parts(cid, "user", "", &result_parts)
2767            .await
2768            .unwrap();
2769
2770        sqlite.save_message(cid, "assistant", "done").await.unwrap();
2771
2772        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2773            std::sync::Arc::new(memory),
2774            cid,
2775            50,
2776            5,
2777            100,
2778        );
2779
2780        let messages_before = agent.msg.messages.len();
2781        agent.load_history().await.unwrap();
2782
2783        // All 3 messages must be loaded — the empty-content ToolResult user message must NOT be
2784        // dropped.
2785        assert_eq!(
2786            agent.msg.messages.len(),
2787            messages_before + 3,
2788            "user message with empty content but ToolResult parts must not be dropped"
2789        );
2790
2791        // The user message at index 1 must still carry the ToolResult part.
2792        let user_msg = &agent.msg.messages[messages_before + 1];
2793        assert_eq!(user_msg.role, Role::User);
2794        assert!(
2795            user_msg.parts.iter().any(
2796                |p| matches!(p, MessagePart::ToolResult { tool_use_id, .. } if tool_use_id == "call_rc3")
2797            ),
2798            "ToolResult part must be preserved on user message with empty content"
2799        );
2800    }
2801
2802    /// RC2 reverse pass: a user message with a `ToolResult` whose `tool_use_id` has no matching
2803    /// `ToolUse` in the preceding assistant message must be stripped by
2804    /// `strip_mid_history_orphans`.
2805    #[tokio::test]
2806    async fn strip_orphans_removes_orphaned_tool_result() {
2807        use zeph_llm::provider::MessagePart;
2808
2809        let provider = mock_provider(vec![]);
2810        let channel = MockChannel::new(vec![]);
2811        let registry = create_test_registry();
2812        let executor = MockToolExecutor::no_tools();
2813
2814        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2815        let cid = memory.sqlite().create_conversation().await.unwrap();
2816        let sqlite = memory.sqlite();
2817
2818        // Normal exchange before the orphan.
2819        sqlite.save_message(cid, "user", "hello").await.unwrap();
2820        sqlite.save_message(cid, "assistant", "hi").await.unwrap();
2821
2822        // Assistant message that does NOT contain a ToolUse.
2823        sqlite
2824            .save_message(cid, "assistant", "plain answer")
2825            .await
2826            .unwrap();
2827
2828        // User message references a tool_use_id that was never sent by the preceding assistant.
2829        let orphan_result_parts = serde_json::to_string(&[MessagePart::ToolResult {
2830            tool_use_id: "call_nonexistent".to_string(),
2831            content: "stale result".to_string(),
2832            is_error: false,
2833        }])
2834        .unwrap();
2835        sqlite
2836            .save_message_with_parts(
2837                cid,
2838                "user",
2839                "[tool_result: call_nonexistent]\nstale result",
2840                &orphan_result_parts,
2841            )
2842            .await
2843            .unwrap();
2844
2845        sqlite
2846            .save_message(cid, "assistant", "final")
2847            .await
2848            .unwrap();
2849
2850        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2851            std::sync::Arc::new(memory),
2852            cid,
2853            50,
2854            5,
2855            100,
2856        );
2857
2858        let messages_before = agent.msg.messages.len();
2859        agent.load_history().await.unwrap();
2860
2861        // The orphaned ToolResult part must have been stripped from the user message.
2862        // The user message itself may be removed (parts empty + content non-empty) or kept with
2863        // the text content — but it must NOT retain the orphaned ToolResult part.
2864        let loaded = &agent.msg.messages[messages_before..];
2865        for msg in loaded {
2866            assert!(
2867                !msg.parts.iter().any(|p| matches!(
2868                    p,
2869                    MessagePart::ToolResult { tool_use_id, .. } if tool_use_id == "call_nonexistent"
2870                )),
2871                "orphaned ToolResult part must be stripped from history"
2872            );
2873        }
2874    }
2875
2876    /// RC2 reverse pass: a complete `tool_use` + `tool_result` pair must pass through the reverse
2877    /// orphan check intact; the fix must not strip valid `ToolResult` parts.
2878    #[tokio::test]
2879    async fn strip_orphans_keeps_complete_pair() {
2880        use zeph_llm::provider::MessagePart;
2881
2882        let provider = mock_provider(vec![]);
2883        let channel = MockChannel::new(vec![]);
2884        let registry = create_test_registry();
2885        let executor = MockToolExecutor::no_tools();
2886
2887        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2888        let cid = memory.sqlite().create_conversation().await.unwrap();
2889        let sqlite = memory.sqlite();
2890
2891        let use_parts = serde_json::to_string(&[MessagePart::ToolUse {
2892            id: "call_valid".to_string(),
2893            name: "shell".to_string(),
2894            input: serde_json::json!({"command": "ls"}),
2895        }])
2896        .unwrap();
2897        sqlite
2898            .save_message_with_parts(cid, "assistant", "[tool_use: shell]", &use_parts)
2899            .await
2900            .unwrap();
2901
2902        let result_parts = serde_json::to_string(&[MessagePart::ToolResult {
2903            tool_use_id: "call_valid".to_string(),
2904            content: "file.rs".to_string(),
2905            is_error: false,
2906        }])
2907        .unwrap();
2908        sqlite
2909            .save_message_with_parts(cid, "user", "", &result_parts)
2910            .await
2911            .unwrap();
2912
2913        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2914            std::sync::Arc::new(memory),
2915            cid,
2916            50,
2917            5,
2918            100,
2919        );
2920
2921        let messages_before = agent.msg.messages.len();
2922        agent.load_history().await.unwrap();
2923
2924        assert_eq!(
2925            agent.msg.messages.len(),
2926            messages_before + 2,
2927            "complete tool_use/tool_result pair must be preserved"
2928        );
2929
2930        let user_msg = &agent.msg.messages[messages_before + 1];
2931        assert!(
2932            user_msg.parts.iter().any(|p| matches!(
2933                p,
2934                MessagePart::ToolResult { tool_use_id, .. } if tool_use_id == "call_valid"
2935            )),
2936            "ToolResult part for a matched tool_use must not be stripped"
2937        );
2938    }
2939
2940    /// RC2 reverse pass: history with a mix of complete pairs and orphaned `ToolResult` messages.
2941    /// Orphaned `ToolResult` parts must be stripped; complete pairs must pass through intact.
2942    #[tokio::test]
2943    async fn strip_orphans_mixed_history() {
2944        use zeph_llm::provider::MessagePart;
2945
2946        let provider = mock_provider(vec![]);
2947        let channel = MockChannel::new(vec![]);
2948        let registry = create_test_registry();
2949        let executor = MockToolExecutor::no_tools();
2950
2951        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2952        let cid = memory.sqlite().create_conversation().await.unwrap();
2953        let sqlite = memory.sqlite();
2954
2955        // First: complete tool_use / tool_result pair.
2956        let use_parts_ok = serde_json::to_string(&[MessagePart::ToolUse {
2957            id: "call_good".to_string(),
2958            name: "shell".to_string(),
2959            input: serde_json::json!({"command": "pwd"}),
2960        }])
2961        .unwrap();
2962        sqlite
2963            .save_message_with_parts(cid, "assistant", "[tool_use: shell]", &use_parts_ok)
2964            .await
2965            .unwrap();
2966
2967        let result_parts_ok = serde_json::to_string(&[MessagePart::ToolResult {
2968            tool_use_id: "call_good".to_string(),
2969            content: "/home".to_string(),
2970            is_error: false,
2971        }])
2972        .unwrap();
2973        sqlite
2974            .save_message_with_parts(cid, "user", "", &result_parts_ok)
2975            .await
2976            .unwrap();
2977
2978        // Second: plain assistant message followed by an orphaned ToolResult user message.
2979        sqlite
2980            .save_message(cid, "assistant", "text only")
2981            .await
2982            .unwrap();
2983
2984        let orphan_parts = serde_json::to_string(&[MessagePart::ToolResult {
2985            tool_use_id: "call_ghost".to_string(),
2986            content: "ghost result".to_string(),
2987            is_error: false,
2988        }])
2989        .unwrap();
2990        sqlite
2991            .save_message_with_parts(
2992                cid,
2993                "user",
2994                "[tool_result: call_ghost]\nghost result",
2995                &orphan_parts,
2996            )
2997            .await
2998            .unwrap();
2999
3000        sqlite
3001            .save_message(cid, "assistant", "final reply")
3002            .await
3003            .unwrap();
3004
3005        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
3006            std::sync::Arc::new(memory),
3007            cid,
3008            50,
3009            5,
3010            100,
3011        );
3012
3013        let messages_before = agent.msg.messages.len();
3014        agent.load_history().await.unwrap();
3015
3016        let loaded = &agent.msg.messages[messages_before..];
3017
3018        // The orphaned ToolResult part must not appear in any message.
3019        for msg in loaded {
3020            assert!(
3021                !msg.parts.iter().any(|p| matches!(
3022                    p,
3023                    MessagePart::ToolResult { tool_use_id, .. } if tool_use_id == "call_ghost"
3024                )),
3025                "orphaned ToolResult (call_ghost) must be stripped from history"
3026            );
3027        }
3028
3029        // The matched ToolResult must still be present on the user message following the
3030        // first assistant message.
3031        let has_good_result = loaded.iter().any(|msg| {
3032            msg.role == Role::User
3033                && msg.parts.iter().any(|p| {
3034                    matches!(
3035                        p,
3036                        MessagePart::ToolResult { tool_use_id, .. } if tool_use_id == "call_good"
3037                    )
3038                })
3039        });
3040        assert!(
3041            has_good_result,
3042            "matched ToolResult (call_good) must be preserved in history"
3043        );
3044    }
3045
3046    /// Regression: a properly matched `tool_use`/`tool_result` pair must NOT be touched by the
3047    /// mid-history scan — ensures the fix doesn't break valid tool exchanges.
3048    #[tokio::test]
3049    async fn sanitize_tool_pairs_preserves_matched_tool_pair() {
3050        use zeph_llm::provider::MessagePart;
3051
3052        let provider = mock_provider(vec![]);
3053        let channel = MockChannel::new(vec![]);
3054        let registry = create_test_registry();
3055        let executor = MockToolExecutor::no_tools();
3056
3057        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
3058        let cid = memory.sqlite().create_conversation().await.unwrap();
3059        let sqlite = memory.sqlite();
3060
3061        sqlite
3062            .save_message(cid, "user", "run a command")
3063            .await
3064            .unwrap();
3065
3066        // Assistant sends a ToolUse.
3067        let use_parts = serde_json::to_string(&[MessagePart::ToolUse {
3068            id: "call_ok".to_string(),
3069            name: "shell".to_string(),
3070            input: serde_json::json!({"command": "echo hi"}),
3071        }])
3072        .unwrap();
3073        sqlite
3074            .save_message_with_parts(cid, "assistant", "[tool_use: shell]", &use_parts)
3075            .await
3076            .unwrap();
3077
3078        // Matching user ToolResult follows.
3079        let result_parts = serde_json::to_string(&[MessagePart::ToolResult {
3080            tool_use_id: "call_ok".to_string(),
3081            content: "hi".to_string(),
3082            is_error: false,
3083        }])
3084        .unwrap();
3085        sqlite
3086            .save_message_with_parts(cid, "user", "[tool_result: call_ok]\nhi", &result_parts)
3087            .await
3088            .unwrap();
3089
3090        sqlite.save_message(cid, "assistant", "done").await.unwrap();
3091
3092        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
3093            std::sync::Arc::new(memory),
3094            cid,
3095            50,
3096            5,
3097            100,
3098        );
3099
3100        let messages_before = agent.msg.messages.len();
3101        agent.load_history().await.unwrap();
3102
3103        // All 4 messages preserved, tool_use parts intact.
3104        assert_eq!(
3105            agent.msg.messages.len(),
3106            messages_before + 4,
3107            "matched tool pair must not be removed"
3108        );
3109        let tool_msg = &agent.msg.messages[messages_before + 1];
3110        assert!(
3111            tool_msg
3112                .parts
3113                .iter()
3114                .any(|p| matches!(p, MessagePart::ToolUse { id, .. } if id == "call_ok")),
3115            "matched ToolUse parts must be preserved"
3116        );
3117    }
3118
3119    /// RC5: `persist_cancelled_tool_results` must persist a tombstone user message containing
3120    /// `is_error=true` `ToolResult` parts for all `tool_calls` IDs so the preceding assistant
3121    /// `ToolUse` is never orphaned in the DB after a cancellation.
3122    #[tokio::test]
3123    async fn persist_cancelled_tool_results_pairs_tool_use() {
3124        use zeph_llm::provider::MessagePart;
3125
3126        let provider = mock_provider(vec![]);
3127        let channel = MockChannel::new(vec![]);
3128        let registry = create_test_registry();
3129        let executor = MockToolExecutor::no_tools();
3130
3131        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
3132        let cid = memory.sqlite().create_conversation().await.unwrap();
3133
3134        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
3135            std::sync::Arc::new(memory),
3136            cid,
3137            50,
3138            5,
3139            100,
3140        );
3141
3142        // Simulate: assistant message with two ToolUse parts already persisted.
3143        let tool_calls = vec![
3144            zeph_llm::provider::ToolUseRequest {
3145                id: "cancel_id_1".to_string(),
3146                name: "shell".to_string().into(),
3147                input: serde_json::json!({}),
3148            },
3149            zeph_llm::provider::ToolUseRequest {
3150                id: "cancel_id_2".to_string(),
3151                name: "read_file".to_string().into(),
3152                input: serde_json::json!({}),
3153            },
3154        ];
3155
3156        agent.persist_cancelled_tool_results(&tool_calls).await;
3157
3158        let history = agent
3159            .memory_state
3160            .persistence
3161            .memory
3162            .as_ref()
3163            .unwrap()
3164            .sqlite()
3165            .load_history(cid, 50)
3166            .await
3167            .unwrap();
3168
3169        // Exactly one user message must have been persisted.
3170        assert_eq!(history.len(), 1);
3171        assert_eq!(history[0].role, Role::User);
3172
3173        // It must contain is_error=true ToolResult for each tool call ID.
3174        for tc in &tool_calls {
3175            assert!(
3176                history[0].parts.iter().any(|p| matches!(
3177                    p,
3178                    MessagePart::ToolResult { tool_use_id, is_error, .. }
3179                        if tool_use_id == &tc.id && *is_error
3180                )),
3181                "tombstone ToolResult for {} must be present and is_error=true",
3182                tc.id
3183            );
3184        }
3185    }
3186
3187    // ---- has_meaningful_content unit tests ----
3188
3189    #[test]
3190    fn meaningful_content_empty_string() {
3191        assert!(!has_meaningful_content(""));
3192    }
3193
3194    #[test]
3195    fn meaningful_content_whitespace_only() {
3196        assert!(!has_meaningful_content("   \n\t  "));
3197    }
3198
3199    #[test]
3200    fn meaningful_content_tool_use_only() {
3201        assert!(!has_meaningful_content("[tool_use: shell(call_1)]"));
3202    }
3203
3204    #[test]
3205    fn meaningful_content_tool_use_no_parens() {
3206        // Format produced when tool_use is stored without explicit id parens.
3207        assert!(!has_meaningful_content("[tool_use: memory_save]"));
3208    }
3209
3210    #[test]
3211    fn meaningful_content_tool_result_with_body() {
3212        assert!(!has_meaningful_content(
3213            "[tool_result: call_1]\nsome output here"
3214        ));
3215    }
3216
3217    #[test]
3218    fn meaningful_content_tool_result_empty_body() {
3219        assert!(!has_meaningful_content("[tool_result: call_1]\n"));
3220    }
3221
3222    #[test]
3223    fn meaningful_content_tool_output_inline() {
3224        assert!(!has_meaningful_content("[tool output: bash] some result"));
3225    }
3226
3227    #[test]
3228    fn meaningful_content_tool_output_pruned() {
3229        assert!(!has_meaningful_content("[tool output: bash] (pruned)"));
3230    }
3231
3232    #[test]
3233    fn meaningful_content_tool_output_fenced() {
3234        assert!(!has_meaningful_content(
3235            "[tool output: bash]\n```\nls output\n```"
3236        ));
3237    }
3238
3239    #[test]
3240    fn meaningful_content_multiple_tool_use_tags() {
3241        assert!(!has_meaningful_content(
3242            "[tool_use: bash(id1)][tool_use: read(id2)]"
3243        ));
3244    }
3245
3246    #[test]
3247    fn meaningful_content_multiple_tool_use_tags_space_separator() {
3248        // Space between tags is not meaningful content.
3249        assert!(!has_meaningful_content(
3250            "[tool_use: bash(id1)] [tool_use: read(id2)]"
3251        ));
3252    }
3253
3254    #[test]
3255    fn meaningful_content_multiple_tool_use_tags_newline_separator() {
3256        // Newline-only separator is also not meaningful.
3257        assert!(!has_meaningful_content(
3258            "[tool_use: bash(id1)]\n[tool_use: read(id2)]"
3259        ));
3260    }
3261
3262    #[test]
3263    fn meaningful_content_tool_result_followed_by_tool_use() {
3264        // Two tags in sequence — no real text between them.
3265        assert!(!has_meaningful_content(
3266            "[tool_result: call_1]\nresult\n[tool_use: bash(call_2)]"
3267        ));
3268    }
3269
3270    #[test]
3271    fn meaningful_content_real_text_only() {
3272        assert!(has_meaningful_content("Hello, how can I help you?"));
3273    }
3274
3275    #[test]
3276    fn meaningful_content_text_before_tool_tag() {
3277        assert!(has_meaningful_content("Let me check. [tool_use: bash(id)]"));
3278    }
3279
3280    #[test]
3281    fn meaningful_content_text_after_tool_use_tag() {
3282        // Text appearing after a [tool_use: name] tag (without parens) is a JSON body
3283        // in the request-builder format — but since that format never reaches the DB,
3284        // this test verifies conservative behavior: the helper returns true (do not delete).
3285        assert!(has_meaningful_content("[tool_use: bash] I ran the command"));
3286    }
3287
3288    #[test]
3289    fn meaningful_content_text_between_tags() {
3290        assert!(has_meaningful_content(
3291            "[tool_use: bash(id1)]\nand then\n[tool_use: read(id2)]"
3292        ));
3293    }
3294
3295    #[test]
3296    fn meaningful_content_malformed_tag_no_closing_bracket() {
3297        // Conservative: malformed tag must not trigger delete.
3298        assert!(has_meaningful_content("[tool_use: "));
3299    }
3300
3301    #[test]
3302    fn meaningful_content_tool_use_and_tool_result_only() {
3303        // Typical persisted assistant+user pair content with no extra text.
3304        assert!(!has_meaningful_content(
3305            "[tool_use: memory_save(call_abc)]\n[tool_result: call_abc]\nsaved"
3306        ));
3307    }
3308
3309    #[test]
3310    fn meaningful_content_tool_result_body_with_json_array() {
3311        assert!(!has_meaningful_content(
3312            "[tool_result: id1]\n[\"array\", \"value\"]"
3313        ));
3314    }
3315
3316    // ---- Integration tests for the #2529 fix: soft-delete of legacy-content orphans ----
3317
3318    /// #2529 regression: orphaned assistant `ToolUse` + user `ToolResult` pair where BOTH messages
3319    /// have content consisting solely of legacy tool bracket strings (no human-readable text).
3320    ///
3321    /// Before the fix, `content.trim().is_empty()` returned false for these messages, so they
3322    /// were never soft-deleted and the WARN log repeated on every session restart.
3323    ///
3324    /// After the fix, `has_meaningful_content` returns false for legacy-only content, so both
3325    /// orphaned messages are soft-deleted (non-null `deleted_at`) in `SQLite`.
3326    #[tokio::test]
3327    async fn issue_2529_orphaned_legacy_content_pair_is_soft_deleted() {
3328        use zeph_llm::provider::MessagePart;
3329
3330        let provider = mock_provider(vec![]);
3331        let channel = MockChannel::new(vec![]);
3332        let registry = create_test_registry();
3333        let executor = MockToolExecutor::no_tools();
3334
3335        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
3336        let cid = memory.sqlite().create_conversation().await.unwrap();
3337        let sqlite = memory.sqlite();
3338
3339        // A normal user message that anchors the conversation.
3340        sqlite
3341            .save_message(cid, "user", "save this for me")
3342            .await
3343            .unwrap();
3344
3345        // Orphaned assistant[ToolUse]: content is ONLY a legacy tool tag — no matching
3346        // ToolResult follows. This is the exact pattern that triggered #2529.
3347        let use_parts = serde_json::to_string(&[MessagePart::ToolUse {
3348            id: "call_2529".to_string(),
3349            name: "memory_save".to_string(),
3350            input: serde_json::json!({"content": "save this"}),
3351        }])
3352        .unwrap();
3353        let orphan_assistant_id = sqlite
3354            .save_message_with_parts(
3355                cid,
3356                "assistant",
3357                "[tool_use: memory_save(call_2529)]",
3358                &use_parts,
3359            )
3360            .await
3361            .unwrap();
3362
3363        // Orphaned user[ToolResult]: content is ONLY a legacy tool tag + body.
3364        // Its tool_use_id ("call_2529") does not match any ToolUse in the preceding assistant
3365        // message in this position (will be made orphaned by inserting a plain assistant message
3366        // between them to break the pair).
3367        sqlite
3368            .save_message(cid, "assistant", "here is a plain reply")
3369            .await
3370            .unwrap();
3371
3372        let result_parts = serde_json::to_string(&[MessagePart::ToolResult {
3373            tool_use_id: "call_2529".to_string(),
3374            content: "saved".to_string(),
3375            is_error: false,
3376        }])
3377        .unwrap();
3378        let orphan_user_id = sqlite
3379            .save_message_with_parts(
3380                cid,
3381                "user",
3382                "[tool_result: call_2529]\nsaved",
3383                &result_parts,
3384            )
3385            .await
3386            .unwrap();
3387
3388        // Final plain message so history doesn't end on the orphan.
3389        sqlite.save_message(cid, "assistant", "done").await.unwrap();
3390
3391        let memory_arc = std::sync::Arc::new(memory);
3392        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
3393            memory_arc.clone(),
3394            cid,
3395            50,
3396            5,
3397            100,
3398        );
3399
3400        agent.load_history().await.unwrap();
3401
3402        // Verify that both orphaned messages now have `deleted_at IS NOT NULL` in SQLite.
3403        // COUNT(*) WHERE deleted_at IS NOT NULL returns 1 if soft-deleted, 0 otherwise.
3404        let assistant_deleted_count: Vec<i64> = zeph_db::query_scalar(
3405            "SELECT COUNT(*) FROM messages WHERE id = ? AND deleted_at IS NOT NULL",
3406        )
3407        .bind(orphan_assistant_id)
3408        .fetch_all(memory_arc.sqlite().pool())
3409        .await
3410        .unwrap();
3411
3412        let user_deleted_count: Vec<i64> = zeph_db::query_scalar(
3413            "SELECT COUNT(*) FROM messages WHERE id = ? AND deleted_at IS NOT NULL",
3414        )
3415        .bind(orphan_user_id)
3416        .fetch_all(memory_arc.sqlite().pool())
3417        .await
3418        .unwrap();
3419
3420        assert_eq!(
3421            assistant_deleted_count.first().copied().unwrap_or(0),
3422            1,
3423            "orphaned assistant[ToolUse] with legacy-only content must be soft-deleted (deleted_at IS NOT NULL)"
3424        );
3425        assert_eq!(
3426            user_deleted_count.first().copied().unwrap_or(0),
3427            1,
3428            "orphaned user[ToolResult] with legacy-only content must be soft-deleted (deleted_at IS NOT NULL)"
3429        );
3430    }
3431
3432    /// #2529 idempotency: after soft-delete on first `load_history`, a second call must not
3433    /// re-load the soft-deleted orphans. This ensures the WARN log does not repeat on the
3434    /// next session startup for the same orphaned messages.
3435    #[tokio::test]
3436    async fn issue_2529_soft_delete_is_idempotent_across_sessions() {
3437        use zeph_llm::provider::MessagePart;
3438
3439        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
3440        let cid = memory.sqlite().create_conversation().await.unwrap();
3441        let sqlite = memory.sqlite();
3442
3443        // Normal anchor message.
3444        sqlite
3445            .save_message(cid, "user", "do something")
3446            .await
3447            .unwrap();
3448
3449        // Orphaned assistant[ToolUse] with legacy-only content.
3450        let use_parts = serde_json::to_string(&[MessagePart::ToolUse {
3451            id: "call_idem".to_string(),
3452            name: "shell".to_string(),
3453            input: serde_json::json!({"command": "ls"}),
3454        }])
3455        .unwrap();
3456        sqlite
3457            .save_message_with_parts(cid, "assistant", "[tool_use: shell(call_idem)]", &use_parts)
3458            .await
3459            .unwrap();
3460
3461        // Break the pair: insert a plain assistant message so the ToolUse is mid-history orphan.
3462        sqlite
3463            .save_message(cid, "assistant", "continuing")
3464            .await
3465            .unwrap();
3466
3467        // Orphaned user[ToolResult] with legacy-only content.
3468        let result_parts = serde_json::to_string(&[MessagePart::ToolResult {
3469            tool_use_id: "call_idem".to_string(),
3470            content: "output".to_string(),
3471            is_error: false,
3472        }])
3473        .unwrap();
3474        sqlite
3475            .save_message_with_parts(
3476                cid,
3477                "user",
3478                "[tool_result: call_idem]\noutput",
3479                &result_parts,
3480            )
3481            .await
3482            .unwrap();
3483
3484        sqlite
3485            .save_message(cid, "assistant", "final")
3486            .await
3487            .unwrap();
3488
3489        let memory_arc = std::sync::Arc::new(memory);
3490
3491        // First session: load_history performs soft-delete of the orphaned pair.
3492        let mut agent1 = Agent::new(
3493            mock_provider(vec![]),
3494            MockChannel::new(vec![]),
3495            create_test_registry(),
3496            None,
3497            5,
3498            MockToolExecutor::no_tools(),
3499        )
3500        .with_memory(memory_arc.clone(), cid, 50, 5, 100);
3501        agent1.load_history().await.unwrap();
3502        let count_after_first = agent1.msg.messages.len();
3503
3504        // Second session: a fresh agent loading the same conversation must not see the
3505        // soft-deleted orphans — the WARN log must not repeat.
3506        let mut agent2 = Agent::new(
3507            mock_provider(vec![]),
3508            MockChannel::new(vec![]),
3509            create_test_registry(),
3510            None,
3511            5,
3512            MockToolExecutor::no_tools(),
3513        )
3514        .with_memory(memory_arc.clone(), cid, 50, 5, 100);
3515        agent2.load_history().await.unwrap();
3516        let count_after_second = agent2.msg.messages.len();
3517
3518        // Both sessions must load the same number of messages — soft-deleted orphans excluded.
3519        assert_eq!(
3520            count_after_first, count_after_second,
3521            "second load_history must load the same message count as the first (soft-deleted orphans excluded)"
3522        );
3523    }
3524
3525    /// Edge case for #2529: an orphaned assistant message whose content has BOTH meaningful text
3526    /// AND a `tool_use` tag must NOT be soft-deleted. The `ToolUse` parts are stripped but the
3527    /// message is kept because it has human-readable content.
3528    #[tokio::test]
3529    async fn issue_2529_message_with_text_and_tool_tag_is_kept_after_part_strip() {
3530        use zeph_llm::provider::MessagePart;
3531
3532        let provider = mock_provider(vec![]);
3533        let channel = MockChannel::new(vec![]);
3534        let registry = create_test_registry();
3535        let executor = MockToolExecutor::no_tools();
3536
3537        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
3538        let cid = memory.sqlite().create_conversation().await.unwrap();
3539        let sqlite = memory.sqlite();
3540
3541        // Normal first exchange.
3542        sqlite
3543            .save_message(cid, "user", "check the files")
3544            .await
3545            .unwrap();
3546
3547        // Assistant message: has BOTH meaningful text AND a ToolUse part.
3548        // Content contains real prose + legacy tag — has_meaningful_content must return true.
3549        let use_parts = serde_json::to_string(&[MessagePart::ToolUse {
3550            id: "call_mixed".to_string(),
3551            name: "shell".to_string(),
3552            input: serde_json::json!({"command": "ls"}),
3553        }])
3554        .unwrap();
3555        sqlite
3556            .save_message_with_parts(
3557                cid,
3558                "assistant",
3559                "Let me list the directory. [tool_use: shell(call_mixed)]",
3560                &use_parts,
3561            )
3562            .await
3563            .unwrap();
3564
3565        // No matching ToolResult follows — the ToolUse is orphaned.
3566        sqlite.save_message(cid, "user", "thanks").await.unwrap();
3567        sqlite
3568            .save_message(cid, "assistant", "you are welcome")
3569            .await
3570            .unwrap();
3571
3572        let memory_arc = std::sync::Arc::new(memory);
3573        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
3574            memory_arc.clone(),
3575            cid,
3576            50,
3577            5,
3578            100,
3579        );
3580
3581        let messages_before = agent.msg.messages.len();
3582        agent.load_history().await.unwrap();
3583
3584        // All 4 messages must be present — the mixed-content assistant message is KEPT.
3585        assert_eq!(
3586            agent.msg.messages.len(),
3587            messages_before + 4,
3588            "assistant message with text + tool tag must not be removed after ToolUse strip"
3589        );
3590
3591        // The mixed-content assistant message must have its ToolUse parts stripped.
3592        let mixed_msg = agent
3593            .msg
3594            .messages
3595            .iter()
3596            .find(|m| m.content.contains("Let me list the directory"))
3597            .expect("mixed-content assistant message must still be in history");
3598        assert!(
3599            !mixed_msg
3600                .parts
3601                .iter()
3602                .any(|p| matches!(p, MessagePart::ToolUse { .. })),
3603            "orphaned ToolUse parts must be stripped even when message has meaningful text"
3604        );
3605        assert_eq!(
3606            mixed_msg.content, "Let me list the directory. [tool_use: shell(call_mixed)]",
3607            "content field must be unchanged — only parts are stripped"
3608        );
3609    }
3610
3611    // ── [skipped]/[stopped] ToolResult embedding guard (#2620) ──────────────
3612
3613    #[tokio::test]
3614    async fn persist_message_skipped_tool_result_does_not_embed() {
3615        use zeph_llm::provider::MessagePart;
3616
3617        let provider = mock_provider(vec![]);
3618        let channel = MockChannel::new(vec![]);
3619        let registry = create_test_registry();
3620        let executor = MockToolExecutor::no_tools();
3621
3622        let (tx, rx) = tokio::sync::watch::channel(MetricsSnapshot::default());
3623        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
3624        let cid = memory.sqlite().create_conversation().await.unwrap();
3625
3626        let mut agent = Agent::new(provider, channel, registry, None, 5, executor)
3627            .with_metrics(tx)
3628            .with_memory(std::sync::Arc::new(memory), cid, 50, 5, 100);
3629        agent.memory_state.persistence.autosave_assistant = true;
3630        agent.memory_state.persistence.autosave_min_length = 0;
3631
3632        let parts = vec![MessagePart::ToolResult {
3633            tool_use_id: "tu1".into(),
3634            content: "[skipped] bash tool was blocked by utility gate".into(),
3635            is_error: false,
3636        }];
3637
3638        agent
3639            .persist_message(
3640                Role::User,
3641                "[skipped] bash tool was blocked by utility gate",
3642                &parts,
3643                false,
3644            )
3645            .await;
3646
3647        assert_eq!(
3648            rx.borrow().embeddings_generated,
3649            0,
3650            "[skipped] ToolResult must not be embedded into Qdrant"
3651        );
3652    }
3653
3654    #[tokio::test]
3655    async fn persist_message_stopped_tool_result_does_not_embed() {
3656        use zeph_llm::provider::MessagePart;
3657
3658        let provider = mock_provider(vec![]);
3659        let channel = MockChannel::new(vec![]);
3660        let registry = create_test_registry();
3661        let executor = MockToolExecutor::no_tools();
3662
3663        let (tx, rx) = tokio::sync::watch::channel(MetricsSnapshot::default());
3664        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
3665        let cid = memory.sqlite().create_conversation().await.unwrap();
3666
3667        let mut agent = Agent::new(provider, channel, registry, None, 5, executor)
3668            .with_metrics(tx)
3669            .with_memory(std::sync::Arc::new(memory), cid, 50, 5, 100);
3670        agent.memory_state.persistence.autosave_assistant = true;
3671        agent.memory_state.persistence.autosave_min_length = 0;
3672
3673        let parts = vec![MessagePart::ToolResult {
3674            tool_use_id: "tu2".into(),
3675            content: "[stopped] execution limit reached".into(),
3676            is_error: false,
3677        }];
3678
3679        agent
3680            .persist_message(
3681                Role::User,
3682                "[stopped] execution limit reached",
3683                &parts,
3684                false,
3685            )
3686            .await;
3687
3688        assert_eq!(
3689            rx.borrow().embeddings_generated,
3690            0,
3691            "[stopped] ToolResult must not be embedded into Qdrant"
3692        );
3693    }
3694
3695    #[tokio::test]
3696    async fn persist_message_normal_tool_result_is_saved_not_blocked_by_guard() {
3697        // Regression: a normal ToolResult (no [skipped]/[stopped] prefix) must not be
3698        // suppressed by the utility-gate guard and must reach the save path (SQLite).
3699        use zeph_llm::provider::MessagePart;
3700
3701        let provider = mock_provider(vec![]);
3702        let channel = MockChannel::new(vec![]);
3703        let registry = create_test_registry();
3704        let executor = MockToolExecutor::no_tools();
3705
3706        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
3707        let cid = memory.sqlite().create_conversation().await.unwrap();
3708        let memory_arc = std::sync::Arc::new(memory);
3709
3710        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
3711            memory_arc.clone(),
3712            cid,
3713            50,
3714            5,
3715            100,
3716        );
3717        agent.memory_state.persistence.autosave_assistant = true;
3718        agent.memory_state.persistence.autosave_min_length = 0;
3719
3720        let content = "total 42\ndrwxr-xr-x  5 user group";
3721        let parts = vec![MessagePart::ToolResult {
3722            tool_use_id: "tu3".into(),
3723            content: content.into(),
3724            is_error: false,
3725        }];
3726
3727        agent
3728            .persist_message(Role::User, content, &parts, false)
3729            .await;
3730
3731        // Must be saved to SQLite — confirms the guard did not block this path.
3732        let history = memory_arc.sqlite().load_history(cid, 50).await.unwrap();
3733        assert_eq!(
3734            history.len(),
3735            1,
3736            "normal ToolResult must be saved to SQLite"
3737        );
3738        assert_eq!(history[0].content, content);
3739    }
3740
3741    /// Verify that `enqueue_trajectory_extraction_task` uses a bounded tail slice instead of
3742    /// cloning the full message vec. We confirm the slice logic by checking that the
3743    /// `tail_start` calculation correctly bounds the window even with more messages than
3744    /// `max_messages`.
3745    #[test]
3746    fn trajectory_extraction_slice_bounds_messages() {
3747        // Replicate the slice logic from enqueue_trajectory_extraction_task.
3748        let max_messages: usize = 20;
3749        let total_messages = 100usize;
3750
3751        let tail_start = total_messages.saturating_sub(max_messages);
3752        let window = total_messages - tail_start;
3753
3754        assert_eq!(
3755            window, 20,
3756            "slice should contain exactly max_messages items"
3757        );
3758        assert_eq!(tail_start, 80, "slice should start at len - max_messages");
3759    }
3760
3761    #[test]
3762    fn trajectory_extraction_slice_handles_few_messages() {
3763        let max_messages: usize = 20;
3764        let total_messages = 5usize;
3765
3766        let tail_start = total_messages.saturating_sub(max_messages);
3767        let window = total_messages - tail_start;
3768
3769        assert_eq!(window, 5, "should return all messages when fewer than max");
3770        assert_eq!(tail_start, 0, "slice should start from the beginning");
3771    }
3772
3773    // --- #3168 regression tests ---
3774
3775    /// Round-trip: persist `Assistant[tool_use]` + `User[tool_result]`, then `load_history`.
3776    /// After `sanitize_tool_pairs` the pair must be intact — no WARN, both messages present
3777    /// with non-empty parts.
3778    #[tokio::test]
3779    async fn regression_3168_complete_tool_pair_survives_round_trip() {
3780        use zeph_llm::provider::MessagePart;
3781
3782        let provider = mock_provider(vec![]);
3783        let channel = MockChannel::new(vec![]);
3784        let registry = create_test_registry();
3785        let executor = MockToolExecutor::no_tools();
3786
3787        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
3788        let cid = memory.sqlite().create_conversation().await.unwrap();
3789        let sqlite = memory.sqlite();
3790
3791        let use_parts = serde_json::to_string(&[MessagePart::ToolUse {
3792            id: "r3168_call".to_string(),
3793            name: "shell".to_string(),
3794            input: serde_json::json!({"command": "echo hi"}),
3795        }])
3796        .unwrap();
3797        sqlite
3798            .save_message_with_parts(
3799                cid,
3800                "assistant",
3801                "[tool_use: shell(r3168_call)]",
3802                &use_parts,
3803            )
3804            .await
3805            .unwrap();
3806
3807        let result_parts = serde_json::to_string(&[MessagePart::ToolResult {
3808            tool_use_id: "r3168_call".to_string(),
3809            content: "[skipped]".to_string(),
3810            is_error: false,
3811        }])
3812        .unwrap();
3813        sqlite
3814            .save_message_with_parts(cid, "user", "[tool_result: r3168_call]", &result_parts)
3815            .await
3816            .unwrap();
3817
3818        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
3819            std::sync::Arc::new(memory),
3820            cid,
3821            50,
3822            5,
3823            100,
3824        );
3825
3826        let base = agent.msg.messages.len();
3827        agent.load_history().await.unwrap();
3828
3829        assert_eq!(
3830            agent.msg.messages.len(),
3831            base + 2,
3832            "both messages of the complete pair must survive load_history"
3833        );
3834
3835        let assistant_msg = agent
3836            .msg
3837            .messages
3838            .iter()
3839            .find(|m| m.role == Role::Assistant)
3840            .expect("assistant message missing after load_history");
3841        assert!(
3842            assistant_msg
3843                .parts
3844                .iter()
3845                .any(|p| matches!(p, MessagePart::ToolUse { id, .. } if id == "r3168_call")),
3846            "ToolUse part must be preserved in assistant message"
3847        );
3848
3849        let user_msg = agent
3850            .msg
3851            .messages
3852            .iter()
3853            .rev()
3854            .find(|m| m.role == Role::User)
3855            .expect("user message missing after load_history");
3856        assert!(
3857            user_msg.parts.iter().any(|p| matches!(p, MessagePart::ToolResult { tool_use_id, .. } if tool_use_id == "r3168_call")),
3858            "ToolResult part must be preserved in user message"
3859        );
3860    }
3861
3862    /// A System message between an `Assistant[tool_use]` and the matching `User[tool_result]`
3863    /// must not cause the `tool_use` to be treated as an orphan.
3864    #[test]
3865    fn regression_3168_system_between_tool_pair_not_stripped() {
3866        use zeph_llm::provider::{MessageMetadata, MessagePart};
3867
3868        let tool_id = "call_sys_between".to_string();
3869
3870        let mut messages = vec![
3871            Message {
3872                role: Role::Assistant,
3873                content: "[tool_use: shell(call_sys_between)]".to_string(),
3874                parts: vec![MessagePart::ToolUse {
3875                    id: tool_id.clone(),
3876                    name: "shell".to_string(),
3877                    input: serde_json::json!({"command": "ls"}),
3878                }],
3879                metadata: MessageMetadata::default(),
3880            },
3881            Message {
3882                role: Role::System,
3883                content: "system hint injected between tool calls".to_string(),
3884                parts: vec![],
3885                metadata: MessageMetadata::default(),
3886            },
3887            Message {
3888                role: Role::User,
3889                content: "[tool_result: call_sys_between]".to_string(),
3890                parts: vec![MessagePart::ToolResult {
3891                    tool_use_id: tool_id.clone(),
3892                    content: "output".to_string(),
3893                    is_error: false,
3894                }],
3895                metadata: MessageMetadata::default(),
3896            },
3897        ];
3898
3899        let (removed, _db_ids) = strip_mid_history_orphans(&mut messages);
3900
3901        assert_eq!(
3902            removed, 0,
3903            "no messages should be removed when System sits between a matched tool_use/tool_result pair"
3904        );
3905        assert_eq!(messages.len(), 3, "all three messages must remain");
3906        assert!(
3907            messages[0]
3908                .parts
3909                .iter()
3910                .any(|p| matches!(p, MessagePart::ToolUse { id, .. } if id == &tool_id)),
3911            "ToolUse part must not be stripped from assistant message"
3912        );
3913    }
3914
3915    /// If parts serialization fails, `persist_message` must return early and not store
3916    /// a row with empty parts that would create an orphaned `tool_use` on next session load.
3917    /// We verify this by writing a row with invalid `parts_json` directly and confirming
3918    /// `load_history` skips it (empty parts row is not injected into the agent).
3919    #[tokio::test]
3920    async fn regression_3168_corrupt_parts_row_skipped_on_load() {
3921        use zeph_llm::provider::MessagePart;
3922
3923        let provider = mock_provider(vec![]);
3924        let channel = MockChannel::new(vec![]);
3925        let registry = create_test_registry();
3926        let executor = MockToolExecutor::no_tools();
3927
3928        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
3929        let cid = memory.sqlite().create_conversation().await.unwrap();
3930        let sqlite = memory.sqlite();
3931
3932        // Simulate the pre-fix bug: assistant message stored with empty parts_json "[]"
3933        // even though it should have had a ToolUse part. This is what persist_message used
3934        // to do before the early-return fix.
3935        sqlite
3936            .save_message_with_parts(cid, "assistant", "[tool_use: shell(corrupt)]", "[]")
3937            .await
3938            .unwrap();
3939
3940        // User message with the matching tool_result stored correctly.
3941        let result_parts = serde_json::to_string(&[MessagePart::ToolResult {
3942            tool_use_id: "corrupt".to_string(),
3943            content: "result".to_string(),
3944            is_error: false,
3945        }])
3946        .unwrap();
3947        sqlite
3948            .save_message_with_parts(cid, "user", "[tool_result: corrupt]", &result_parts)
3949            .await
3950            .unwrap();
3951
3952        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
3953            std::sync::Arc::new(memory),
3954            cid,
3955            50,
3956            5,
3957            100,
3958        );
3959
3960        let base = agent.msg.messages.len();
3961        agent.load_history().await.unwrap();
3962
3963        // The user ToolResult has no matching ToolUse in the assistant message (parts="[]"),
3964        // so sanitize_tool_pairs must strip the orphaned ToolResult.
3965        // Net result: only the content-only assistant message survives; user msg is removed
3966        // because after stripping its only part it becomes empty.
3967        let loaded = agent.msg.messages.len() - base;
3968        // No message injected with orphaned ToolResult parts — either stripped entirely or
3969        // the ToolResult part removed. Verify no user message with ToolResult remains.
3970        let orphan_present = agent.msg.messages.iter().any(|m| {
3971            m.role == Role::User
3972                && m.parts.iter().any(|p| {
3973                    matches!(p, MessagePart::ToolResult { tool_use_id, .. } if tool_use_id == "corrupt")
3974                })
3975        });
3976        assert!(
3977            !orphan_present,
3978            "orphaned ToolResult must not survive load_history; loaded={loaded}"
3979        );
3980    }
3981}