Skip to main content

zeph_core/agent/
persistence.rs

1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4use std::collections::HashSet;
5
6use crate::channel::Channel;
7use zeph_llm::provider::{LlmProvider as _, Message, MessagePart, Role};
8use zeph_memory::store::role_str;
9
10use super::Agent;
11
12/// Remove orphaned `ToolUse`/`ToolResult` messages from restored history.
13///
14/// Four failure modes are handled:
15/// 1. Trailing orphan: the last message is an assistant with `ToolUse` parts but no subsequent
16///    user message with `ToolResult` — caused by LIMIT boundary splits or interrupted sessions.
17/// 2. Leading orphan: the first message is a user with `ToolResult` parts but no preceding
18///    assistant message with `ToolUse` — caused by LIMIT boundary cuts.
19/// 3. Mid-history orphaned `ToolUse`: an assistant message with `ToolUse` parts is not followed
20///    by a user message with matching `ToolResult` parts. The `ToolUse` parts are stripped;
21///    if no content remains the message is removed.
22/// 4. Mid-history orphaned `ToolResult`: a user message has `ToolResult` parts whose
23///    `tool_use_id` is not present in the preceding assistant message. Those `ToolResult` parts
24///    are stripped; if no content remains the message is removed.
25///
26/// Boundary cases are resolved in a loop before the mid-history scan runs.
27fn sanitize_tool_pairs(messages: &mut Vec<Message>) -> (usize, Vec<i64>) {
28    let mut removed = 0;
29    let mut db_ids: Vec<i64> = Vec::new();
30
31    loop {
32        // Remove trailing orphaned tool_use (assistant message with ToolUse, no following tool_result).
33        if let Some(last) = messages.last()
34            && last.role == Role::Assistant
35            && last
36                .parts
37                .iter()
38                .any(|p| matches!(p, MessagePart::ToolUse { .. }))
39        {
40            let ids: Vec<String> = last
41                .parts
42                .iter()
43                .filter_map(|p| {
44                    if let MessagePart::ToolUse { id, .. } = p {
45                        Some(id.clone())
46                    } else {
47                        None
48                    }
49                })
50                .collect();
51            tracing::warn!(
52                tool_ids = ?ids,
53                "removing orphaned trailing tool_use message from restored history"
54            );
55            if let Some(db_id) = messages.last().and_then(|m| m.metadata.db_id) {
56                db_ids.push(db_id);
57            }
58            messages.pop();
59            removed += 1;
60            continue;
61        }
62
63        // Remove leading orphaned tool_result (user message with ToolResult, no preceding tool_use).
64        if let Some(first) = messages.first()
65            && first.role == Role::User
66            && first
67                .parts
68                .iter()
69                .any(|p| matches!(p, MessagePart::ToolResult { .. }))
70        {
71            let ids: Vec<String> = first
72                .parts
73                .iter()
74                .filter_map(|p| {
75                    if let MessagePart::ToolResult { tool_use_id, .. } = p {
76                        Some(tool_use_id.clone())
77                    } else {
78                        None
79                    }
80                })
81                .collect();
82            tracing::warn!(
83                tool_use_ids = ?ids,
84                "removing orphaned leading tool_result message from restored history"
85            );
86            if let Some(db_id) = messages.first().and_then(|m| m.metadata.db_id) {
87                db_ids.push(db_id);
88            }
89            messages.remove(0);
90            removed += 1;
91            continue;
92        }
93
94        break;
95    }
96
97    // Mid-history scan: strip ToolUse parts from any assistant message whose tool IDs are not
98    // matched by ToolResult parts in the immediately following user message.
99    let (mid_removed, mid_db_ids) = strip_mid_history_orphans(messages);
100    removed += mid_removed;
101    db_ids.extend(mid_db_ids);
102
103    (removed, db_ids)
104}
105
106/// Collect `tool_use` IDs from `msg` that have no matching `ToolResult` in `next_msg`.
107fn orphaned_tool_use_ids(msg: &Message, next_msg: Option<&Message>) -> HashSet<String> {
108    let matched: HashSet<String> = next_msg
109        .filter(|n| n.role == Role::User)
110        .map(|n| {
111            msg.parts
112                .iter()
113                .filter_map(|p| if let MessagePart::ToolUse { id, .. } = p { Some(id.clone()) } else { None })
114                .filter(|uid| n.parts.iter().any(|np| matches!(np, MessagePart::ToolResult { tool_use_id, .. } if tool_use_id == uid)))
115                .collect()
116        })
117        .unwrap_or_default();
118    msg.parts
119        .iter()
120        .filter_map(|p| {
121            if let MessagePart::ToolUse { id, .. } = p
122                && !matched.contains(id)
123            {
124                Some(id.clone())
125            } else {
126                None
127            }
128        })
129        .collect()
130}
131
132/// Collect `tool_result` IDs from `msg` that have no matching `ToolUse` in `prev_msg`.
133fn orphaned_tool_result_ids(msg: &Message, prev_msg: Option<&Message>) -> HashSet<String> {
134    let avail: HashSet<&str> = prev_msg
135        .filter(|p| p.role == Role::Assistant)
136        .map(|p| {
137            p.parts
138                .iter()
139                .filter_map(|part| {
140                    if let MessagePart::ToolUse { id, .. } = part {
141                        Some(id.as_str())
142                    } else {
143                        None
144                    }
145                })
146                .collect()
147        })
148        .unwrap_or_default();
149    msg.parts
150        .iter()
151        .filter_map(|p| {
152            if let MessagePart::ToolResult { tool_use_id, .. } = p
153                && !avail.contains(tool_use_id.as_str())
154            {
155                Some(tool_use_id.clone())
156            } else {
157                None
158            }
159        })
160        .collect()
161}
162
163/// Returns `true` if `content` contains human-readable text beyond legacy tool bracket markers.
164///
165/// Legacy markers produced by `Message::flatten_parts` are:
166/// - `[tool_use: name(id)]` — assistant `ToolUse`
167/// - `[tool_result: id]\nbody` — user `ToolResult` (tag + trailing body up to the next tag)
168/// - `[tool output: name] body` — `ToolOutput` (pruned or inline)
169/// - `[tool output: name]\n```\nbody\n``` ` — `ToolOutput` fenced block
170///
171/// A message whose content consists solely of such markers (and whitespace) has no
172/// user-visible text and is a candidate for soft-delete once its structured `parts` are gone.
173///
174/// Conservative rule: if a tag is malformed (no closing `]`), the content is treated as
175/// meaningful and the message is NOT deleted.
176///
177/// Note: `[image: mime, N bytes]` placeholders are intentionally treated as meaningful because
178/// they represent real media content and are not pure tool-execution artifacts.
179///
180/// Note: the Claude request-builder format `[tool_use: name] {json_input}` is used only for
181/// API payload construction and is never written to `SQLite` — it cannot appear in persisted
182/// message content, so no special handling is needed here.
183fn has_meaningful_content(content: &str) -> bool {
184    const PREFIXES: [&str; 3] = ["[tool_use: ", "[tool_result: ", "[tool output: "];
185
186    let mut remaining = content.trim();
187
188    loop {
189        // Find the earliest occurrence of any tool tag prefix.
190        let next = PREFIXES
191            .iter()
192            .filter_map(|prefix| remaining.find(prefix).map(|pos| (pos, *prefix)))
193            .min_by_key(|(pos, _)| *pos);
194
195        let Some((start, prefix)) = next else {
196            // No more tool tags — whatever remains decides the verdict.
197            break;
198        };
199
200        // Any non-whitespace text before this tag is meaningful.
201        if !remaining[..start].trim().is_empty() {
202            return true;
203        }
204
205        // Advance past the prefix to find the closing `]`.
206        let after_prefix = &remaining[start + prefix.len()..];
207        let Some(close) = after_prefix.find(']') else {
208            // Malformed tag (no closing bracket) — treat as meaningful, do not delete.
209            return true;
210        };
211
212        // Position after the `]`.
213        let tag_end = start + prefix.len() + close + 1;
214
215        if prefix == "[tool_result: " || prefix == "[tool output: " {
216            // Skip the body that immediately follows until the next tool tag prefix or end-of-string.
217            // The body is part of the tool artifact, not human-readable content.
218            let body = remaining[tag_end..].trim_start_matches('\n');
219            let next_tag = PREFIXES
220                .iter()
221                .filter_map(|p| body.find(p))
222                .min()
223                .unwrap_or(body.len());
224            remaining = &body[next_tag..];
225        } else {
226            remaining = &remaining[tag_end..];
227        }
228    }
229
230    !remaining.trim().is_empty()
231}
232
233/// Scan all messages and strip orphaned `ToolUse`/`ToolResult` parts from mid-history messages.
234///
235/// Two directions are checked:
236/// - Forward: assistant message has `ToolUse` parts not matched by `ToolResult` in the next user
237///   message — strip those `ToolUse` parts.
238/// - Reverse: user message has `ToolResult` parts whose `tool_use_id` is not present as a
239///   `ToolUse` in the preceding assistant message — strip those `ToolResult` parts.
240///
241/// Text parts are preserved; messages with no remaining content are removed entirely.
242///
243/// Returns `(count, db_ids)` where `count` is the number of messages removed entirely and
244/// `db_ids` contains the `metadata.db_id` values of those removed messages (for DB cleanup).
245fn strip_mid_history_orphans(messages: &mut Vec<Message>) -> (usize, Vec<i64>) {
246    let mut removed = 0;
247    let mut db_ids: Vec<i64> = Vec::new();
248    let mut i = 0;
249    while i < messages.len() {
250        // Forward pass: strip ToolUse parts from assistant messages that lack a matching
251        // ToolResult in the next user message. Only orphaned IDs are stripped — other ToolUse
252        // parts in the same message that DO have a matching ToolResult are preserved.
253        if messages[i].role == Role::Assistant
254            && messages[i]
255                .parts
256                .iter()
257                .any(|p| matches!(p, MessagePart::ToolUse { .. }))
258        {
259            let next_non_system = (i + 1..messages.len())
260                .find(|&j| messages[j].role != Role::System)
261                .and_then(|j| messages.get(j));
262            let orphaned_ids = orphaned_tool_use_ids(&messages[i], next_non_system);
263            if !orphaned_ids.is_empty() {
264                tracing::warn!(
265                    tool_ids = ?orphaned_ids,
266                    index = i,
267                    "stripping orphaned mid-history tool_use parts from assistant message"
268                );
269                messages[i].parts.retain(
270                    |p| !matches!(p, MessagePart::ToolUse { id, .. } if orphaned_ids.contains(id)),
271                );
272                let is_empty =
273                    !has_meaningful_content(&messages[i].content) && messages[i].parts.is_empty();
274                if is_empty {
275                    if let Some(db_id) = messages[i].metadata.db_id {
276                        db_ids.push(db_id);
277                    }
278                    messages.remove(i);
279                    removed += 1;
280                    continue; // Do not advance i — the next message is now at position i.
281                }
282            }
283        }
284
285        // Reverse pass: user ToolResult without matching ToolUse in the preceding assistant message.
286        if messages[i].role == Role::User
287            && messages[i]
288                .parts
289                .iter()
290                .any(|p| matches!(p, MessagePart::ToolResult { .. }))
291        {
292            let prev_non_system = (0..i)
293                .rev()
294                .find(|&j| messages[j].role != Role::System)
295                .and_then(|j| messages.get(j));
296            let orphaned_ids = orphaned_tool_result_ids(&messages[i], prev_non_system);
297            if !orphaned_ids.is_empty() {
298                tracing::warn!(
299                    tool_use_ids = ?orphaned_ids,
300                    index = i,
301                    "stripping orphaned mid-history tool_result parts from user message"
302                );
303                messages[i].parts.retain(|p| {
304                    !matches!(p, MessagePart::ToolResult { tool_use_id, .. } if orphaned_ids.contains(tool_use_id.as_str()))
305                });
306
307                let is_empty =
308                    !has_meaningful_content(&messages[i].content) && messages[i].parts.is_empty();
309                if is_empty {
310                    if let Some(db_id) = messages[i].metadata.db_id {
311                        db_ids.push(db_id);
312                    }
313                    messages.remove(i);
314                    removed += 1;
315                    // Do not advance i — the next message is now at position i.
316                    continue;
317                }
318            }
319        }
320
321        i += 1;
322    }
323    (removed, db_ids)
324}
325
326impl<C: Channel> Agent<C> {
327    /// Load conversation history from memory and inject into messages.
328    ///
329    /// # Errors
330    ///
331    /// Returns an error if loading history from `SQLite` fails.
332    pub async fn load_history(&mut self) -> Result<(), super::error::AgentError> {
333        let (Some(memory), Some(cid)) = (
334            &self.memory_state.persistence.memory,
335            self.memory_state.persistence.conversation_id,
336        ) else {
337            return Ok(());
338        };
339
340        let history = memory
341            .sqlite()
342            .load_history_filtered(
343                cid,
344                self.memory_state.persistence.history_limit,
345                Some(true),
346                None,
347            )
348            .await?;
349        if !history.is_empty() {
350            let mut loaded = 0;
351            let mut skipped = 0;
352
353            for msg in history {
354                // Only skip messages that have neither text content nor structured parts.
355                // Native tool calls produce user messages with empty `content` but non-empty
356                // `parts` (containing ToolResult). Skipping them here would orphan the
357                // preceding assistant ToolUse before sanitize_tool_pairs can clean it up.
358                if !has_meaningful_content(&msg.content) && msg.parts.is_empty() {
359                    tracing::warn!("skipping empty message from history (role: {:?})", msg.role);
360                    skipped += 1;
361                    continue;
362                }
363                self.msg.messages.push(msg);
364                loaded += 1;
365            }
366
367            // Determine the start index of just-loaded messages (system prompt is at index 0).
368            let history_start = self.msg.messages.len() - loaded;
369            let mut restored_slice = self.msg.messages.split_off(history_start);
370            let (orphans, orphan_db_ids) = sanitize_tool_pairs(&mut restored_slice);
371            skipped += orphans;
372            loaded = loaded.saturating_sub(orphans);
373            self.msg.messages.append(&mut restored_slice);
374
375            if !orphan_db_ids.is_empty() {
376                let ids: Vec<zeph_memory::types::MessageId> = orphan_db_ids
377                    .iter()
378                    .map(|&id| zeph_memory::types::MessageId(id))
379                    .collect();
380                if let Err(e) = memory.sqlite().soft_delete_messages(&ids).await {
381                    tracing::warn!(
382                        count = ids.len(),
383                        error = %e,
384                        "failed to soft-delete orphaned tool-pair messages from DB"
385                    );
386                } else {
387                    tracing::debug!(
388                        count = ids.len(),
389                        "soft-deleted orphaned tool-pair messages from DB"
390                    );
391                }
392            }
393
394            tracing::info!("restored {loaded} message(s) from conversation {cid}");
395            if skipped > 0 {
396                tracing::warn!("skipped {skipped} empty/orphaned message(s) from history");
397            }
398
399            if loaded > 0 {
400                // Increment session counts so tier promotion can track cross-session access.
401                // Errors are non-fatal — promotion will simply use stale counts.
402                let _ = memory
403                    .sqlite()
404                    .increment_session_counts_for_conversation(cid)
405                    .await
406                    .inspect_err(|e| {
407                        tracing::warn!(error = %e, "failed to increment tier session counts");
408                    });
409            }
410        }
411
412        if let Ok(count) = memory.message_count(cid).await {
413            let count_u64 = u64::try_from(count).unwrap_or(0);
414            self.update_metrics(|m| {
415                m.sqlite_message_count = count_u64;
416            });
417        }
418
419        if let Ok(count) = memory.sqlite().count_semantic_facts().await {
420            let count_u64 = u64::try_from(count).unwrap_or(0);
421            self.update_metrics(|m| {
422                m.semantic_fact_count = count_u64;
423            });
424        }
425
426        if let Ok(count) = memory.unsummarized_message_count(cid).await {
427            self.memory_state.persistence.unsummarized_count = usize::try_from(count).unwrap_or(0);
428        }
429
430        self.recompute_prompt_tokens();
431        Ok(())
432    }
433
434    /// Persist a message to memory.
435    ///
436    /// `has_injection_flags` controls whether Qdrant embedding is skipped for this message.
437    /// When `true` and `guard_memory_writes` is enabled, only `SQLite` is written — the message
438    /// is saved for conversation continuity but will not pollute semantic search (M2, D2).
439    #[allow(clippy::too_many_lines)]
440    #[cfg_attr(
441        feature = "profiling",
442        tracing::instrument(name = "agent.persist_message", skip_all)
443    )]
444    pub(crate) async fn persist_message(
445        &mut self,
446        role: Role,
447        content: &str,
448        parts: &[MessagePart],
449        has_injection_flags: bool,
450    ) {
451        let (Some(memory), Some(cid)) = (
452            &self.memory_state.persistence.memory,
453            self.memory_state.persistence.conversation_id,
454        ) else {
455            return;
456        };
457
458        let parts_json = if parts.is_empty() {
459            "[]".to_string()
460        } else {
461            match serde_json::to_string(parts) {
462                Ok(json) => json,
463                Err(e) => {
464                    tracing::error!(
465                        role = ?role,
466                        parts_count = parts.len(),
467                        error = %e,
468                        "failed to serialize message parts — skipping persist to avoid orphaned tool pair"
469                    );
470                    return;
471                }
472            }
473        };
474
475        // M2: injection flag is passed explicitly to avoid stale mutable-bool state on Agent.
476        // When has_injection_flags=true, skip embedding to prevent poisoned content from
477        // polluting Qdrant semantic search results.
478        let guard_event = self
479            .security
480            .exfiltration_guard
481            .should_guard_memory_write(has_injection_flags);
482        if let Some(ref event) = guard_event {
483            tracing::warn!(
484                ?event,
485                "exfiltration guard: skipping Qdrant embedding for flagged content"
486            );
487            self.update_metrics(|m| m.exfiltration_memory_guards += 1);
488            self.push_security_event(
489                crate::metrics::SecurityEventCategory::ExfiltrationBlock,
490                "memory_write",
491                "Qdrant embedding skipped: flagged content",
492            );
493        }
494
495        let skip_embedding = guard_event.is_some();
496
497        // Do not embed [skipped] or [stopped] ToolResult content into Qdrant — these are
498        // internal policy markers that carry no useful semantic information and would
499        // contaminate memory_search results, causing the utility-gate Retrieve loop (#2620).
500        let has_skipped_tool_result = parts.iter().any(|p| {
501            if let MessagePart::ToolResult { content, .. } = p {
502                content.starts_with("[skipped]") || content.starts_with("[stopped]")
503            } else {
504                false
505            }
506        });
507
508        let should_embed = if skip_embedding || has_skipped_tool_result {
509            false
510        } else {
511            match role {
512                Role::Assistant => {
513                    self.memory_state.persistence.autosave_assistant
514                        && content.len() >= self.memory_state.persistence.autosave_min_length
515                }
516                _ => true,
517            }
518        };
519
520        let goal_text = self.memory_state.extraction.goal_text.clone();
521
522        tracing::debug!(
523            "persist_message: calling remember_with_parts, embed dispatched to background"
524        );
525        let (embedding_stored, was_persisted) = if should_embed {
526            match memory
527                .remember_with_parts(
528                    cid,
529                    role_str(role),
530                    content,
531                    &parts_json,
532                    goal_text.as_deref(),
533                )
534                .await
535            {
536                Ok((Some(message_id), stored)) => {
537                    self.msg.last_persisted_message_id = Some(message_id.0);
538                    (stored, true)
539                }
540                Ok((None, _)) => {
541                    // A-MAC admission rejected — skip increment and further processing.
542                    return;
543                }
544                Err(e) => {
545                    tracing::error!("failed to persist message: {e:#}");
546                    return;
547                }
548            }
549        } else {
550            match memory
551                .save_only(cid, role_str(role), content, &parts_json)
552                .await
553            {
554                Ok(message_id) => {
555                    self.msg.last_persisted_message_id = Some(message_id.0);
556                    (false, true)
557                }
558                Err(e) => {
559                    tracing::error!("failed to persist message: {e:#}");
560                    return;
561                }
562            }
563        };
564
565        if !was_persisted {
566            return;
567        }
568
569        self.memory_state.persistence.unsummarized_count += 1;
570
571        self.update_metrics(|m| {
572            m.sqlite_message_count += 1;
573            if embedding_stored {
574                m.embeddings_generated += 1;
575            }
576        });
577
578        tracing::debug!("persist_message: db insert complete, embedding running in background");
579        memory.reap_embed_tasks();
580
581        // Phase 2: enqueue enrichment tasks via supervisor (non-blocking).
582        // check_summarization signals completion via SummarizationSignal, consumed in reap()
583        // between turns — no shared mutable state across tasks (S1 fix).
584        self.enqueue_summarization_task();
585
586        // FIX-1: skip graph extraction for tool result messages — they contain raw structured
587        // output (TOML, JSON, code) that pollutes the entity graph with noise.
588        let has_tool_result_parts = parts
589            .iter()
590            .any(|p| matches!(p, MessagePart::ToolResult { .. }));
591
592        self.enqueue_graph_extraction_task(content, has_injection_flags, has_tool_result_parts)
593            .await;
594
595        // Persona extraction: run only for user messages that are not tool results and not injected.
596        if role == Role::User && !has_tool_result_parts && !has_injection_flags {
597            self.enqueue_persona_extraction_task();
598        }
599
600        // Trajectory extraction: run after turns that contained tool results.
601        if has_tool_result_parts {
602            self.enqueue_trajectory_extraction_task();
603        }
604
605        // ReasoningBank distillation: runs only after the final assistant message of a turn
606        // (C2 fix: skip intermediate tool-call messages). A message with ToolUse parts is an
607        // intermediate step; the final assistant message has no ToolUse parts.
608        // S-Med1: skip if injection patterns detected — mirrors graph extraction guard.
609        let has_tool_use_parts = parts
610            .iter()
611            .any(|p| matches!(p, MessagePart::ToolUse { .. }));
612        if role == Role::Assistant && !has_tool_use_parts && !has_injection_flags {
613            self.enqueue_reasoning_extraction_task();
614        }
615    }
616
617    /// Enqueue background summarization via the supervisor (S1 fix: no shared `AtomicUsize`).
618    fn enqueue_summarization_task(&mut self) {
619        let (Some(memory), Some(cid)) = (
620            self.memory_state.persistence.memory.clone(),
621            self.memory_state.persistence.conversation_id,
622        ) else {
623            return;
624        };
625
626        if self.memory_state.persistence.unsummarized_count
627            <= self.memory_state.compaction.summarization_threshold
628        {
629            return;
630        }
631
632        let batch_size = self.memory_state.compaction.summarization_threshold / 2;
633
634        self.lifecycle.supervisor.spawn_summarization("summarization", async move {
635            match tokio::time::timeout(
636                std::time::Duration::from_secs(30),
637                memory.summarize(cid, batch_size),
638            )
639            .await
640            {
641                Ok(Ok(Some(summary_id))) => {
642                    tracing::info!(
643                        "background summarization: created summary {summary_id} for conversation {cid}"
644                    );
645                    true
646                }
647                Ok(Ok(None)) => {
648                    tracing::debug!("background summarization: no summarization needed");
649                    false
650                }
651                Ok(Err(e)) => {
652                    tracing::error!("background summarization failed: {e:#}");
653                    false
654                }
655                Err(_) => {
656                    tracing::warn!("background summarization timed out after 30s");
657                    false
658                }
659            }
660        });
661    }
662
663    /// Prepare graph extraction guards in foreground, then enqueue heavy work via supervisor.
664    ///
665    /// Guards (enabled check, injection/tool-result skip) stay on the foreground path.
666    /// The RPE check and actual extraction run in background (S2: no `send_status`).
667    #[allow(clippy::too_many_lines)]
668    async fn enqueue_graph_extraction_task(
669        &mut self,
670        content: &str,
671        has_injection_flags: bool,
672        has_tool_result_parts: bool,
673    ) {
674        use zeph_memory::semantic::GraphExtractionConfig;
675
676        if self.memory_state.persistence.memory.is_none()
677            || self.memory_state.persistence.conversation_id.is_none()
678        {
679            return;
680        }
681        if has_tool_result_parts {
682            tracing::debug!("graph extraction skipped: message contains ToolResult parts");
683            return;
684        }
685        if has_injection_flags {
686            tracing::warn!("graph extraction skipped: injection patterns detected in content");
687            return;
688        }
689
690        let extraction_cfg = {
691            let cfg = &self.memory_state.extraction.graph_config;
692            if !cfg.enabled {
693                return;
694            }
695            GraphExtractionConfig {
696                max_entities: cfg.max_entities_per_message,
697                max_edges: cfg.max_edges_per_message,
698                extraction_timeout_secs: cfg.extraction_timeout_secs,
699                community_refresh_interval: cfg.community_refresh_interval,
700                expired_edge_retention_days: cfg.expired_edge_retention_days,
701                max_entities_cap: cfg.max_entities,
702                community_summary_max_prompt_bytes: cfg.community_summary_max_prompt_bytes,
703                community_summary_concurrency: cfg.community_summary_concurrency,
704                lpa_edge_chunk_size: cfg.lpa_edge_chunk_size,
705                note_linking: zeph_memory::NoteLinkingConfig {
706                    enabled: cfg.note_linking.enabled,
707                    similarity_threshold: cfg.note_linking.similarity_threshold,
708                    top_k: cfg.note_linking.top_k,
709                    timeout_secs: cfg.note_linking.timeout_secs,
710                },
711                link_weight_decay_lambda: cfg.link_weight_decay_lambda,
712                link_weight_decay_interval_secs: cfg.link_weight_decay_interval_secs,
713                belief_revision_enabled: cfg.belief_revision.enabled,
714                belief_revision_similarity_threshold: cfg.belief_revision.similarity_threshold,
715                conversation_id: self.memory_state.persistence.conversation_id.map(|c| c.0),
716            }
717        };
718
719        // RPE check: embed + compute surprise score. Stays on foreground to avoid
720        // capturing the rpe_router mutex in a background task.
721        if self.rpe_should_skip(content).await {
722            tracing::debug!("D-MEM RPE: low-surprise turn, skipping graph extraction");
723            return;
724        }
725
726        let context_messages: Vec<String> = self
727            .msg
728            .messages
729            .iter()
730            .rev()
731            .filter(|m| {
732                m.role == Role::User
733                    && !m
734                        .parts
735                        .iter()
736                        .any(|p| matches!(p, MessagePart::ToolResult { .. }))
737            })
738            .take(4)
739            .map(|m| {
740                if m.content.len() > 2048 {
741                    m.content[..m.content.floor_char_boundary(2048)].to_owned()
742                } else {
743                    m.content.clone()
744                }
745            })
746            .collect();
747
748        let Some(memory) = self.memory_state.persistence.memory.clone() else {
749            return;
750        };
751
752        let validator: zeph_memory::semantic::PostExtractValidator =
753            if self.security.memory_validator.is_enabled() {
754                let v = self.security.memory_validator.clone();
755                Some(Box::new(move |result| {
756                    v.validate_graph_extraction(result)
757                        .map_err(|e| e.to_string())
758                }))
759            } else {
760                None
761            };
762
763        let content_owned = content.to_owned();
764        let graph_store = memory.graph_store.clone();
765        let metrics_tx = self.metrics.metrics_tx.clone();
766        let start_time = self.lifecycle.start_time;
767
768        self.lifecycle.supervisor.spawn(
769            super::agent_supervisor::TaskClass::Enrichment,
770            "graph_extraction",
771            async move {
772                let extraction_handle = memory.spawn_graph_extraction(
773                    content_owned,
774                    context_messages,
775                    extraction_cfg,
776                    validator,
777                );
778
779                // After extraction completes, refresh graph count metrics (was Telemetry spawn).
780                if let (Some(store), Some(tx)) = (graph_store, metrics_tx) {
781                    let _ = extraction_handle.await;
782                    let (entities, edges, communities) = tokio::join!(
783                        store.entity_count(),
784                        store.active_edge_count(),
785                        store.community_count()
786                    );
787                    let elapsed = start_time.elapsed().as_secs();
788                    tx.send_modify(|m| {
789                        m.uptime_seconds = elapsed;
790                        m.graph_entities_total = entities.unwrap_or(0).cast_unsigned();
791                        m.graph_edges_total = edges.unwrap_or(0).cast_unsigned();
792                        m.graph_communities_total = communities.unwrap_or(0).cast_unsigned();
793                    });
794                } else {
795                    let _ = extraction_handle.await;
796                }
797
798                tracing::debug!("background graph extraction complete");
799            },
800        );
801
802        // Sync community failures and extraction metrics (cheap, foreground-safe).
803        self.sync_community_detection_failures();
804        self.sync_graph_extraction_metrics();
805        // sync_graph_counts and sync_guidelines_status are DB reads; move to Telemetry background.
806        let memory_for_sync = self.memory_state.persistence.memory.clone();
807        let metrics_tx_sync = self.metrics.metrics_tx.clone();
808        let start_time_sync = self.lifecycle.start_time;
809        let cid_sync = self.memory_state.persistence.conversation_id;
810        let graph_store_sync = memory_for_sync.as_ref().and_then(|m| m.graph_store.clone());
811        let sqlite_sync = memory_for_sync.as_ref().map(|m| m.sqlite().clone());
812        let guidelines_enabled = self.memory_state.extraction.graph_config.enabled;
813
814        self.lifecycle.supervisor.spawn(
815            super::agent_supervisor::TaskClass::Telemetry,
816            "graph_count_sync",
817            async move {
818                let Some(store) = graph_store_sync else {
819                    return;
820                };
821                let Some(tx) = metrics_tx_sync else { return };
822
823                let (entities, edges, communities) = tokio::join!(
824                    store.entity_count(),
825                    store.active_edge_count(),
826                    store.community_count()
827                );
828                let elapsed = start_time_sync.elapsed().as_secs();
829                tx.send_modify(|m| {
830                    m.uptime_seconds = elapsed;
831                    m.graph_entities_total = entities.unwrap_or(0).cast_unsigned();
832                    m.graph_edges_total = edges.unwrap_or(0).cast_unsigned();
833                    m.graph_communities_total = communities.unwrap_or(0).cast_unsigned();
834                });
835
836                // Sync guidelines status.
837                if guidelines_enabled && let Some(sqlite) = sqlite_sync {
838                    match tokio::time::timeout(
839                        std::time::Duration::from_secs(10),
840                        sqlite.load_compression_guidelines_meta(cid_sync),
841                    )
842                    .await
843                    {
844                        Ok(Ok((version, created_at))) => {
845                            #[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
846                            let version_u32 = u32::try_from(version).unwrap_or(0);
847                            tx.send_modify(|m| {
848                                m.guidelines_version = version_u32;
849                                m.guidelines_updated_at = created_at;
850                            });
851                        }
852                        Ok(Err(e)) => {
853                            tracing::debug!("guidelines status sync failed: {e:#}");
854                        }
855                        Err(_) => {
856                            tracing::debug!("guidelines status sync timed out");
857                        }
858                    }
859                }
860            },
861        );
862    }
863
864    /// Enqueue persona extraction via supervisor (background, no `send_status`).
865    fn enqueue_persona_extraction_task(&mut self) {
866        use zeph_memory::semantic::{PersonaExtractionConfig, extract_persona_facts};
867
868        let cfg = &self.memory_state.extraction.persona_config;
869        if !cfg.enabled {
870            return;
871        }
872
873        let Some(memory) = &self.memory_state.persistence.memory else {
874            return;
875        };
876
877        let user_messages: Vec<String> = self
878            .msg
879            .messages
880            .iter()
881            .filter(|m| {
882                m.role == Role::User
883                    && !m
884                        .parts
885                        .iter()
886                        .any(|p| matches!(p, MessagePart::ToolResult { .. }))
887            })
888            .take(8)
889            .map(|m| {
890                if m.content.len() > 2048 {
891                    m.content[..m.content.floor_char_boundary(2048)].to_owned()
892                } else {
893                    m.content.clone()
894                }
895            })
896            .collect();
897
898        if user_messages.len() < cfg.min_messages {
899            return;
900        }
901
902        let timeout_secs = cfg.extraction_timeout_secs;
903        let extraction_cfg = PersonaExtractionConfig {
904            enabled: cfg.enabled,
905            min_messages: cfg.min_messages,
906            max_messages: cfg.max_messages,
907            extraction_timeout_secs: timeout_secs,
908        };
909
910        let provider = self.resolve_background_provider(cfg.persona_provider.as_str());
911        let store = memory.sqlite().clone();
912        let conversation_id = self.memory_state.persistence.conversation_id.map(|c| c.0);
913
914        self.lifecycle.supervisor.spawn(
915            super::agent_supervisor::TaskClass::Enrichment,
916            "persona_extraction",
917            async move {
918                let user_message_refs: Vec<&str> =
919                    user_messages.iter().map(String::as_str).collect();
920                let fut = extract_persona_facts(
921                    &store,
922                    &provider,
923                    &user_message_refs,
924                    &extraction_cfg,
925                    conversation_id,
926                );
927                match tokio::time::timeout(std::time::Duration::from_secs(timeout_secs), fut).await
928                {
929                    Ok(Ok(n)) => tracing::debug!(upserted = n, "persona extraction complete"),
930                    Ok(Err(e)) => tracing::warn!(error = %e, "persona extraction failed"),
931                    Err(_) => tracing::warn!(
932                        timeout_secs,
933                        "persona extraction timed out — no facts written this turn"
934                    ),
935                }
936            },
937        );
938    }
939
940    /// Enqueue trajectory extraction via supervisor (background).
941    fn enqueue_trajectory_extraction_task(&mut self) {
942        use zeph_memory::semantic::{TrajectoryExtractionConfig, extract_trajectory_entries};
943
944        let cfg = self.memory_state.extraction.trajectory_config.clone();
945        if !cfg.enabled {
946            return;
947        }
948
949        let Some(memory) = &self.memory_state.persistence.memory else {
950            return;
951        };
952
953        let conversation_id = match self.memory_state.persistence.conversation_id {
954            Some(cid) => cid.0,
955            None => return,
956        };
957
958        let tail_start = self.msg.messages.len().saturating_sub(cfg.max_messages);
959        let turn_messages: Vec<zeph_llm::provider::Message> =
960            self.msg.messages[tail_start..].to_vec();
961
962        if turn_messages.is_empty() {
963            return;
964        }
965
966        let extraction_cfg = TrajectoryExtractionConfig {
967            enabled: cfg.enabled,
968            max_messages: cfg.max_messages,
969            extraction_timeout_secs: cfg.extraction_timeout_secs,
970        };
971
972        let provider = self.resolve_background_provider(cfg.trajectory_provider.as_str());
973        let store = memory.sqlite().clone();
974        let min_confidence = cfg.min_confidence;
975
976        self.lifecycle.supervisor.spawn(
977            super::agent_supervisor::TaskClass::Enrichment,
978            "trajectory_extraction",
979            async move {
980                let entries =
981                    match extract_trajectory_entries(&provider, &turn_messages, &extraction_cfg)
982                        .await
983                    {
984                        Ok(e) => e,
985                        Err(e) => {
986                            tracing::warn!(error = %e, "trajectory extraction failed");
987                            return;
988                        }
989                    };
990
991                let last_id = store
992                    .trajectory_last_extracted_message_id(conversation_id)
993                    .await
994                    .unwrap_or(0);
995
996                let mut max_id = last_id;
997                for entry in &entries {
998                    if entry.confidence < min_confidence {
999                        continue;
1000                    }
1001                    let tools_json = serde_json::to_string(&entry.tools_used)
1002                        .unwrap_or_else(|_| "[]".to_string());
1003                    match store
1004                        .insert_trajectory_entry(zeph_memory::NewTrajectoryEntry {
1005                            conversation_id: Some(conversation_id),
1006                            turn_index: 0,
1007                            kind: &entry.kind,
1008                            intent: &entry.intent,
1009                            outcome: &entry.outcome,
1010                            tools_used: &tools_json,
1011                            confidence: entry.confidence,
1012                        })
1013                        .await
1014                    {
1015                        Ok(id) => {
1016                            if id > max_id {
1017                                max_id = id;
1018                            }
1019                        }
1020                        Err(e) => tracing::warn!(error = %e, "failed to insert trajectory entry"),
1021                    }
1022                }
1023
1024                if max_id > last_id {
1025                    let _ = store
1026                        .set_trajectory_last_extracted_message_id(conversation_id, max_id)
1027                        .await;
1028                }
1029
1030                tracing::debug!(
1031                    count = entries.len(),
1032                    conversation_id,
1033                    "trajectory extraction complete"
1034                );
1035            },
1036        );
1037    }
1038
1039    /// Enqueue reasoning strategy distillation via supervisor (background, fire-and-forget).
1040    ///
1041    /// Mirrors [`Self::enqueue_trajectory_extraction_task`]. Runs after every assistant turn
1042    /// when `memory.reasoning.enabled = true` and a `ReasoningMemory` is attached.
1043    fn enqueue_reasoning_extraction_task(&mut self) {
1044        let cfg = self.memory_state.extraction.reasoning_config.clone();
1045        if !cfg.enabled {
1046            return;
1047        }
1048
1049        let Some(memory) = &self.memory_state.persistence.memory else {
1050            return;
1051        };
1052
1053        let Some(reasoning) = memory.reasoning.clone() else {
1054            return;
1055        };
1056
1057        let tail_start = self.msg.messages.len().saturating_sub(cfg.max_messages);
1058        let turn_messages: Vec<zeph_llm::provider::Message> =
1059            self.msg.messages[tail_start..].to_vec();
1060
1061        if turn_messages.len() < cfg.min_messages {
1062            return;
1063        }
1064
1065        let extract_provider = self.resolve_background_provider(cfg.extract_provider.as_str());
1066        let distill_provider = self.resolve_background_provider(cfg.distill_provider.as_str());
1067        let embed_provider = memory.effective_embed_provider().clone();
1068        let store_limit = cfg.store_limit;
1069        let extraction_timeout = std::time::Duration::from_secs(cfg.extraction_timeout_secs);
1070        let distill_timeout = std::time::Duration::from_secs(cfg.distill_timeout_secs);
1071        let self_judge_window = cfg.self_judge_window;
1072        let min_assistant_chars = cfg.min_assistant_chars;
1073
1074        self.lifecycle.supervisor.spawn(
1075            super::agent_supervisor::TaskClass::Enrichment,
1076            "reasoning_extraction",
1077            async move {
1078                if let Err(e) = zeph_memory::process_reasoning_turn(
1079                    &reasoning,
1080                    &extract_provider,
1081                    &distill_provider,
1082                    &embed_provider,
1083                    &turn_messages,
1084                    zeph_memory::ProcessTurnConfig {
1085                        store_limit,
1086                        extraction_timeout,
1087                        distill_timeout,
1088                        self_judge_window,
1089                        min_assistant_chars,
1090                    },
1091                )
1092                .await
1093                {
1094                    tracing::warn!(error = %e, "reasoning: process_turn failed");
1095                }
1096
1097                tracing::debug!("reasoning extraction complete");
1098            },
1099        );
1100    }
1101
1102    /// D-MEM RPE check: returns `true` when the current turn should skip graph extraction.
1103    ///
1104    /// Embeds `content`, computes RPE via the router, and updates the router state.
1105    /// Returns `false` (do not skip) on any error — conservative fallback.
1106    async fn rpe_should_skip(&mut self, content: &str) -> bool {
1107        let Some(ref rpe_mutex) = self.memory_state.extraction.rpe_router else {
1108            return false;
1109        };
1110        let Some(memory) = &self.memory_state.persistence.memory else {
1111            return false;
1112        };
1113        let candidates = zeph_memory::extract_candidate_entities(content);
1114        let provider = memory.provider();
1115        let Ok(Ok(emb_vec)) =
1116            tokio::time::timeout(std::time::Duration::from_secs(5), provider.embed(content)).await
1117        else {
1118            return false; // embed failed/timed out → extract
1119        };
1120        if let Ok(mut router) = rpe_mutex.lock() {
1121            let signal = router.compute(&emb_vec, &candidates);
1122            router.push_embedding(emb_vec);
1123            router.push_entities(&candidates);
1124            !signal.should_extract
1125        } else {
1126            tracing::warn!("rpe_router mutex poisoned; falling through to extract");
1127            false
1128        }
1129    }
1130}
1131
1132#[cfg(test)]
1133mod tests {
1134    use super::super::agent_tests::{
1135        MetricsSnapshot, MockChannel, MockToolExecutor, create_test_registry, mock_provider,
1136    };
1137    use super::*;
1138    use zeph_llm::any::AnyProvider;
1139    use zeph_memory::semantic::SemanticMemory;
1140
1141    async fn test_memory(provider: &AnyProvider) -> SemanticMemory {
1142        SemanticMemory::new(
1143            ":memory:",
1144            "http://127.0.0.1:1",
1145            provider.clone(),
1146            "test-model",
1147        )
1148        .await
1149        .unwrap()
1150    }
1151
1152    #[tokio::test]
1153    async fn load_history_without_memory_returns_ok() {
1154        let provider = mock_provider(vec![]);
1155        let channel = MockChannel::new(vec![]);
1156        let registry = create_test_registry();
1157        let executor = MockToolExecutor::no_tools();
1158        let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
1159
1160        let result = agent.load_history().await;
1161        assert!(result.is_ok());
1162        // No messages added when no memory is configured
1163        assert_eq!(agent.msg.messages.len(), 1); // system prompt only
1164    }
1165
1166    #[tokio::test]
1167    async fn load_history_with_messages_injects_into_agent() {
1168        let provider = mock_provider(vec![]);
1169        let channel = MockChannel::new(vec![]);
1170        let registry = create_test_registry();
1171        let executor = MockToolExecutor::no_tools();
1172
1173        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1174        let cid = memory.sqlite().create_conversation().await.unwrap();
1175
1176        memory
1177            .sqlite()
1178            .save_message(cid, "user", "hello from history")
1179            .await
1180            .unwrap();
1181        memory
1182            .sqlite()
1183            .save_message(cid, "assistant", "hi back")
1184            .await
1185            .unwrap();
1186
1187        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
1188            std::sync::Arc::new(memory),
1189            cid,
1190            50,
1191            5,
1192            100,
1193        );
1194
1195        let messages_before = agent.msg.messages.len();
1196        agent.load_history().await.unwrap();
1197        // Two messages were added from history
1198        assert_eq!(agent.msg.messages.len(), messages_before + 2);
1199    }
1200
1201    #[tokio::test]
1202    async fn load_history_skips_empty_messages() {
1203        let provider = mock_provider(vec![]);
1204        let channel = MockChannel::new(vec![]);
1205        let registry = create_test_registry();
1206        let executor = MockToolExecutor::no_tools();
1207
1208        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1209        let cid = memory.sqlite().create_conversation().await.unwrap();
1210
1211        // Save an empty message (should be skipped) and a valid one
1212        memory
1213            .sqlite()
1214            .save_message(cid, "user", "   ")
1215            .await
1216            .unwrap();
1217        memory
1218            .sqlite()
1219            .save_message(cid, "user", "real message")
1220            .await
1221            .unwrap();
1222
1223        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
1224            std::sync::Arc::new(memory),
1225            cid,
1226            50,
1227            5,
1228            100,
1229        );
1230
1231        let messages_before = agent.msg.messages.len();
1232        agent.load_history().await.unwrap();
1233        // Only the non-empty message is loaded
1234        assert_eq!(agent.msg.messages.len(), messages_before + 1);
1235    }
1236
1237    #[tokio::test]
1238    async fn load_history_with_empty_store_returns_ok() {
1239        let provider = mock_provider(vec![]);
1240        let channel = MockChannel::new(vec![]);
1241        let registry = create_test_registry();
1242        let executor = MockToolExecutor::no_tools();
1243
1244        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1245        let cid = memory.sqlite().create_conversation().await.unwrap();
1246
1247        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
1248            std::sync::Arc::new(memory),
1249            cid,
1250            50,
1251            5,
1252            100,
1253        );
1254
1255        let messages_before = agent.msg.messages.len();
1256        agent.load_history().await.unwrap();
1257        // No messages added — empty history
1258        assert_eq!(agent.msg.messages.len(), messages_before);
1259    }
1260
1261    #[tokio::test]
1262    async fn load_history_increments_session_count_for_existing_messages() {
1263        let provider = mock_provider(vec![]);
1264        let channel = MockChannel::new(vec![]);
1265        let registry = create_test_registry();
1266        let executor = MockToolExecutor::no_tools();
1267
1268        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1269        let cid = memory.sqlite().create_conversation().await.unwrap();
1270
1271        // Save two messages — they start with session_count = 0.
1272        let id1 = memory
1273            .sqlite()
1274            .save_message(cid, "user", "hello")
1275            .await
1276            .unwrap();
1277        let id2 = memory
1278            .sqlite()
1279            .save_message(cid, "assistant", "hi")
1280            .await
1281            .unwrap();
1282
1283        let memory_arc = std::sync::Arc::new(memory);
1284        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
1285            memory_arc.clone(),
1286            cid,
1287            50,
1288            5,
1289            100,
1290        );
1291
1292        agent.load_history().await.unwrap();
1293
1294        // Both episodic messages must have session_count = 1 after restore.
1295        let counts: Vec<i64> = zeph_db::query_scalar(
1296            "SELECT session_count FROM messages WHERE id IN (?, ?) ORDER BY id",
1297        )
1298        .bind(id1)
1299        .bind(id2)
1300        .fetch_all(memory_arc.sqlite().pool())
1301        .await
1302        .unwrap();
1303        assert_eq!(
1304            counts,
1305            vec![1, 1],
1306            "session_count must be 1 after first restore"
1307        );
1308    }
1309
1310    #[tokio::test]
1311    async fn load_history_does_not_increment_session_count_for_new_conversation() {
1312        let provider = mock_provider(vec![]);
1313        let channel = MockChannel::new(vec![]);
1314        let registry = create_test_registry();
1315        let executor = MockToolExecutor::no_tools();
1316
1317        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1318        let cid = memory.sqlite().create_conversation().await.unwrap();
1319
1320        // No messages saved — empty conversation.
1321        let memory_arc = std::sync::Arc::new(memory);
1322        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
1323            memory_arc.clone(),
1324            cid,
1325            50,
1326            5,
1327            100,
1328        );
1329
1330        agent.load_history().await.unwrap();
1331
1332        // No rows → no session_count increments → query returns empty.
1333        let counts: Vec<i64> =
1334            zeph_db::query_scalar("SELECT session_count FROM messages WHERE conversation_id = ?")
1335                .bind(cid)
1336                .fetch_all(memory_arc.sqlite().pool())
1337                .await
1338                .unwrap();
1339        assert!(counts.is_empty(), "new conversation must have no messages");
1340    }
1341
1342    #[tokio::test]
1343    async fn persist_message_without_memory_silently_returns() {
1344        // No memory configured — persist_message must not panic
1345        let provider = mock_provider(vec![]);
1346        let channel = MockChannel::new(vec![]);
1347        let registry = create_test_registry();
1348        let executor = MockToolExecutor::no_tools();
1349        let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
1350
1351        // Must not panic and must complete
1352        agent.persist_message(Role::User, "hello", &[], false).await;
1353    }
1354
1355    #[tokio::test]
1356    async fn persist_message_assistant_autosave_false_uses_save_only() {
1357        let provider = mock_provider(vec![]);
1358        let channel = MockChannel::new(vec![]);
1359        let registry = create_test_registry();
1360        let executor = MockToolExecutor::no_tools();
1361
1362        let (tx, rx) = tokio::sync::watch::channel(MetricsSnapshot::default());
1363        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1364        let cid = memory.sqlite().create_conversation().await.unwrap();
1365
1366        let mut agent = Agent::new(provider, channel, registry, None, 5, executor)
1367            .with_metrics(tx)
1368            .with_memory(std::sync::Arc::new(memory), cid, 50, 5, 100);
1369        agent.memory_state.persistence.autosave_assistant = false;
1370        agent.memory_state.persistence.autosave_min_length = 20;
1371
1372        agent
1373            .persist_message(Role::Assistant, "short assistant reply", &[], false)
1374            .await;
1375
1376        let history = agent
1377            .memory_state
1378            .persistence
1379            .memory
1380            .as_ref()
1381            .unwrap()
1382            .sqlite()
1383            .load_history(cid, 50)
1384            .await
1385            .unwrap();
1386        assert_eq!(history.len(), 1, "message must be saved");
1387        assert_eq!(history[0].content, "short assistant reply");
1388        // embeddings_generated must remain 0 — save_only path does not embed
1389        assert_eq!(rx.borrow().embeddings_generated, 0);
1390    }
1391
1392    #[tokio::test]
1393    async fn persist_message_assistant_below_min_length_uses_save_only() {
1394        let provider = mock_provider(vec![]);
1395        let channel = MockChannel::new(vec![]);
1396        let registry = create_test_registry();
1397        let executor = MockToolExecutor::no_tools();
1398
1399        let (tx, rx) = tokio::sync::watch::channel(MetricsSnapshot::default());
1400        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1401        let cid = memory.sqlite().create_conversation().await.unwrap();
1402
1403        // autosave_assistant=true but min_length=1000 — short content falls back to save_only
1404        let mut agent = Agent::new(provider, channel, registry, None, 5, executor)
1405            .with_metrics(tx)
1406            .with_memory(std::sync::Arc::new(memory), cid, 50, 5, 100);
1407        agent.memory_state.persistence.autosave_assistant = true;
1408        agent.memory_state.persistence.autosave_min_length = 1000;
1409
1410        agent
1411            .persist_message(Role::Assistant, "too short", &[], false)
1412            .await;
1413
1414        let history = agent
1415            .memory_state
1416            .persistence
1417            .memory
1418            .as_ref()
1419            .unwrap()
1420            .sqlite()
1421            .load_history(cid, 50)
1422            .await
1423            .unwrap();
1424        assert_eq!(history.len(), 1, "message must be saved");
1425        assert_eq!(history[0].content, "too short");
1426        assert_eq!(rx.borrow().embeddings_generated, 0);
1427    }
1428
1429    #[tokio::test]
1430    async fn persist_message_assistant_at_min_length_boundary_uses_embed() {
1431        // content.len() == autosave_min_length → should_embed = true (>= boundary).
1432        let provider = mock_provider(vec![]);
1433        let channel = MockChannel::new(vec![]);
1434        let registry = create_test_registry();
1435        let executor = MockToolExecutor::no_tools();
1436
1437        let (tx, rx) = tokio::sync::watch::channel(MetricsSnapshot::default());
1438        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1439        let cid = memory.sqlite().create_conversation().await.unwrap();
1440
1441        let min_length = 10usize;
1442        let mut agent = Agent::new(provider, channel, registry, None, 5, executor)
1443            .with_metrics(tx)
1444            .with_memory(std::sync::Arc::new(memory), cid, 50, 5, 100);
1445        agent.memory_state.persistence.autosave_assistant = true;
1446        agent.memory_state.persistence.autosave_min_length = min_length;
1447
1448        // Exact boundary: len == min_length → embed path.
1449        let content_at_boundary = "A".repeat(min_length);
1450        assert_eq!(content_at_boundary.len(), min_length);
1451        agent
1452            .persist_message(Role::Assistant, &content_at_boundary, &[], false)
1453            .await;
1454
1455        // sqlite_message_count must be incremented regardless of embedding success.
1456        assert_eq!(rx.borrow().sqlite_message_count, 1);
1457    }
1458
1459    #[tokio::test]
1460    async fn persist_message_assistant_one_below_min_length_uses_save_only() {
1461        // content.len() == autosave_min_length - 1 → should_embed = false (below boundary).
1462        let provider = mock_provider(vec![]);
1463        let channel = MockChannel::new(vec![]);
1464        let registry = create_test_registry();
1465        let executor = MockToolExecutor::no_tools();
1466
1467        let (tx, rx) = tokio::sync::watch::channel(MetricsSnapshot::default());
1468        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1469        let cid = memory.sqlite().create_conversation().await.unwrap();
1470
1471        let min_length = 10usize;
1472        let mut agent = Agent::new(provider, channel, registry, None, 5, executor)
1473            .with_metrics(tx)
1474            .with_memory(std::sync::Arc::new(memory), cid, 50, 5, 100);
1475        agent.memory_state.persistence.autosave_assistant = true;
1476        agent.memory_state.persistence.autosave_min_length = min_length;
1477
1478        // One below boundary: len == min_length - 1 → save_only path, no embedding.
1479        let content_below_boundary = "A".repeat(min_length - 1);
1480        assert_eq!(content_below_boundary.len(), min_length - 1);
1481        agent
1482            .persist_message(Role::Assistant, &content_below_boundary, &[], false)
1483            .await;
1484
1485        let history = agent
1486            .memory_state
1487            .persistence
1488            .memory
1489            .as_ref()
1490            .unwrap()
1491            .sqlite()
1492            .load_history(cid, 50)
1493            .await
1494            .unwrap();
1495        assert_eq!(history.len(), 1, "message must still be saved");
1496        // save_only path does not embed.
1497        assert_eq!(rx.borrow().embeddings_generated, 0);
1498    }
1499
1500    #[tokio::test]
1501    async fn persist_message_increments_unsummarized_count() {
1502        let provider = mock_provider(vec![]);
1503        let channel = MockChannel::new(vec![]);
1504        let registry = create_test_registry();
1505        let executor = MockToolExecutor::no_tools();
1506
1507        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1508        let cid = memory.sqlite().create_conversation().await.unwrap();
1509
1510        // threshold=100 ensures no summarization is triggered
1511        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
1512            std::sync::Arc::new(memory),
1513            cid,
1514            50,
1515            5,
1516            100,
1517        );
1518
1519        assert_eq!(agent.memory_state.persistence.unsummarized_count, 0);
1520
1521        agent.persist_message(Role::User, "first", &[], false).await;
1522        assert_eq!(agent.memory_state.persistence.unsummarized_count, 1);
1523
1524        agent
1525            .persist_message(Role::User, "second", &[], false)
1526            .await;
1527        assert_eq!(agent.memory_state.persistence.unsummarized_count, 2);
1528    }
1529
1530    #[tokio::test]
1531    async fn check_summarization_resets_counter_on_success() {
1532        let provider = mock_provider(vec![]);
1533        let channel = MockChannel::new(vec![]);
1534        let registry = create_test_registry();
1535        let executor = MockToolExecutor::no_tools();
1536
1537        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1538        let cid = memory.sqlite().create_conversation().await.unwrap();
1539
1540        // threshold=1 so the second persist triggers summarization check (count > threshold)
1541        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
1542            std::sync::Arc::new(memory),
1543            cid,
1544            50,
1545            5,
1546            1,
1547        );
1548
1549        agent.persist_message(Role::User, "msg1", &[], false).await;
1550        agent.persist_message(Role::User, "msg2", &[], false).await;
1551
1552        // After summarization attempt (summarize returns Ok(None) since no messages qualify),
1553        // the counter is NOT reset to 0 — only reset on Ok(Some(_)).
1554        // This verifies check_summarization is called and the guard condition works.
1555        // unsummarized_count must be >= 2 before any summarization or 0 if summarization ran.
1556        assert!(agent.memory_state.persistence.unsummarized_count <= 2);
1557    }
1558
1559    #[tokio::test]
1560    async fn unsummarized_count_not_incremented_without_memory() {
1561        let provider = mock_provider(vec![]);
1562        let channel = MockChannel::new(vec![]);
1563        let registry = create_test_registry();
1564        let executor = MockToolExecutor::no_tools();
1565        let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
1566
1567        agent.persist_message(Role::User, "hello", &[], false).await;
1568        // No memory configured — persist_message returns early, counter must stay 0.
1569        assert_eq!(agent.memory_state.persistence.unsummarized_count, 0);
1570    }
1571
1572    // R-CRIT-01: unit tests for enqueue_graph_extraction_task guard conditions.
1573    mod graph_extraction_guards {
1574        use super::*;
1575        use crate::config::GraphConfig;
1576        use zeph_llm::provider::MessageMetadata;
1577        use zeph_memory::graph::GraphStore;
1578
1579        fn enabled_graph_config() -> GraphConfig {
1580            GraphConfig {
1581                enabled: true,
1582                ..GraphConfig::default()
1583            }
1584        }
1585
1586        async fn agent_with_graph(
1587            provider: &AnyProvider,
1588            config: GraphConfig,
1589        ) -> Agent<MockChannel> {
1590            let memory =
1591                test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1592            let cid = memory.sqlite().create_conversation().await.unwrap();
1593            Agent::new(
1594                provider.clone(),
1595                MockChannel::new(vec![]),
1596                create_test_registry(),
1597                None,
1598                5,
1599                MockToolExecutor::no_tools(),
1600            )
1601            .with_memory(std::sync::Arc::new(memory), cid, 50, 5, 100)
1602            .with_graph_config(config)
1603        }
1604
1605        #[tokio::test]
1606        async fn injection_flag_guard_skips_extraction() {
1607            // has_injection_flags=true → extraction must be skipped; no counter in graph_metadata.
1608            let provider = mock_provider(vec![]);
1609            let mut agent = agent_with_graph(&provider, enabled_graph_config()).await;
1610            let pool = agent
1611                .memory_state
1612                .persistence
1613                .memory
1614                .as_ref()
1615                .unwrap()
1616                .sqlite()
1617                .pool()
1618                .clone();
1619
1620            agent
1621                .enqueue_graph_extraction_task("I use Rust", true, false)
1622                .await;
1623
1624            // Give any accidental spawn time to settle.
1625            tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1626
1627            let store = GraphStore::new(pool);
1628            let count = store.get_metadata("extraction_count").await.unwrap();
1629            assert!(
1630                count.is_none(),
1631                "injection flag must prevent extraction_count from being written"
1632            );
1633        }
1634
1635        #[tokio::test]
1636        async fn disabled_config_guard_skips_extraction() {
1637            // graph.enabled=false → extraction must be skipped.
1638            let provider = mock_provider(vec![]);
1639            let disabled_cfg = GraphConfig {
1640                enabled: false,
1641                ..GraphConfig::default()
1642            };
1643            let mut agent = agent_with_graph(&provider, disabled_cfg).await;
1644            let pool = agent
1645                .memory_state
1646                .persistence
1647                .memory
1648                .as_ref()
1649                .unwrap()
1650                .sqlite()
1651                .pool()
1652                .clone();
1653
1654            agent
1655                .enqueue_graph_extraction_task("I use Rust", false, false)
1656                .await;
1657
1658            tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1659
1660            let store = GraphStore::new(pool);
1661            let count = store.get_metadata("extraction_count").await.unwrap();
1662            assert!(
1663                count.is_none(),
1664                "disabled graph config must prevent extraction"
1665            );
1666        }
1667
1668        #[tokio::test]
1669        async fn happy_path_fires_extraction() {
1670            // With enabled config and no injection flags, extraction is spawned.
1671            // MockProvider returns None (no entities), but the counter must be incremented.
1672            let provider = mock_provider(vec![]);
1673            let mut agent = agent_with_graph(&provider, enabled_graph_config()).await;
1674            let pool = agent
1675                .memory_state
1676                .persistence
1677                .memory
1678                .as_ref()
1679                .unwrap()
1680                .sqlite()
1681                .pool()
1682                .clone();
1683
1684            agent
1685                .enqueue_graph_extraction_task("I use Rust for systems programming", false, false)
1686                .await;
1687
1688            // Wait for the spawned task to complete.
1689            tokio::time::sleep(std::time::Duration::from_millis(200)).await;
1690
1691            let store = GraphStore::new(pool);
1692            let count = store.get_metadata("extraction_count").await.unwrap();
1693            assert!(
1694                count.is_some(),
1695                "happy-path extraction must increment extraction_count"
1696            );
1697        }
1698
1699        #[tokio::test]
1700        async fn tool_result_parts_guard_skips_extraction() {
1701            // FIX-1 regression: has_tool_result_parts=true → extraction must be skipped.
1702            // Tool result messages contain raw structured output (TOML, JSON, code) — not
1703            // conversational content. Extracting entities from them produces graph noise.
1704            let provider = mock_provider(vec![]);
1705            let mut agent = agent_with_graph(&provider, enabled_graph_config()).await;
1706            let pool = agent
1707                .memory_state
1708                .persistence
1709                .memory
1710                .as_ref()
1711                .unwrap()
1712                .sqlite()
1713                .pool()
1714                .clone();
1715
1716            agent
1717                .enqueue_graph_extraction_task(
1718                    "[tool_result: abc123]\nprovider_type = \"claude\"\nallowed_commands = []",
1719                    false,
1720                    true, // has_tool_result_parts
1721                )
1722                .await;
1723
1724            tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1725
1726            let store = GraphStore::new(pool);
1727            let count = store.get_metadata("extraction_count").await.unwrap();
1728            assert!(
1729                count.is_none(),
1730                "tool result message must not trigger graph extraction"
1731            );
1732        }
1733
1734        #[tokio::test]
1735        async fn context_filter_excludes_tool_result_messages() {
1736            // FIX-2: context_messages must not include tool result user messages.
1737            // When enqueue_graph_extraction_task collects context, it filters out
1738            // Role::User messages that contain ToolResult parts — only conversational
1739            // user messages are included as extraction context.
1740            //
1741            // This test verifies the guard fires: a tool result message alone is passed
1742            // (has_tool_result_parts=true) → extraction is skipped entirely, so context
1743            // filtering is not exercised. We verify FIX-2 by ensuring a prior tool result
1744            // message in agent.msg.messages is excluded when a subsequent conversational message
1745            // triggers extraction.
1746            let provider = mock_provider(vec![]);
1747            let mut agent = agent_with_graph(&provider, enabled_graph_config()).await;
1748
1749            // Add a tool result message to the agent's message history — this simulates
1750            // a tool call response that arrived before the current conversational turn.
1751            agent.msg.messages.push(Message {
1752                role: Role::User,
1753                content: "[tool_result: abc]\nprovider_type = \"openai\"".to_owned(),
1754                parts: vec![MessagePart::ToolResult {
1755                    tool_use_id: "abc".to_owned(),
1756                    content: "provider_type = \"openai\"".to_owned(),
1757                    is_error: false,
1758                }],
1759                metadata: MessageMetadata::default(),
1760            });
1761
1762            let pool = agent
1763                .memory_state
1764                .persistence
1765                .memory
1766                .as_ref()
1767                .unwrap()
1768                .sqlite()
1769                .pool()
1770                .clone();
1771
1772            // Trigger extraction for a conversational message (not a tool result).
1773            agent
1774                .enqueue_graph_extraction_task(
1775                    "I prefer Rust for systems programming",
1776                    false,
1777                    false,
1778                )
1779                .await;
1780
1781            tokio::time::sleep(std::time::Duration::from_millis(200)).await;
1782
1783            // Extraction must have fired (conversational message, no injection flags).
1784            let store = GraphStore::new(pool);
1785            let count = store.get_metadata("extraction_count").await.unwrap();
1786            assert!(
1787                count.is_some(),
1788                "conversational message must trigger extraction even with prior tool result in history"
1789            );
1790        }
1791    }
1792
1793    // R-PERS-01: unit tests for enqueue_persona_extraction_task guard conditions.
1794    mod persona_extraction_guards {
1795        use super::*;
1796        use zeph_config::PersonaConfig;
1797        use zeph_llm::provider::MessageMetadata;
1798
1799        fn enabled_persona_config() -> PersonaConfig {
1800            PersonaConfig {
1801                enabled: true,
1802                min_messages: 1,
1803                ..PersonaConfig::default()
1804            }
1805        }
1806
1807        async fn agent_with_persona(
1808            provider: &AnyProvider,
1809            config: PersonaConfig,
1810        ) -> Agent<MockChannel> {
1811            let memory =
1812                test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1813            let cid = memory.sqlite().create_conversation().await.unwrap();
1814            let mut agent = Agent::new(
1815                provider.clone(),
1816                MockChannel::new(vec![]),
1817                create_test_registry(),
1818                None,
1819                5,
1820                MockToolExecutor::no_tools(),
1821            )
1822            .with_memory(std::sync::Arc::new(memory), cid, 50, 5, 100);
1823            agent.memory_state.extraction.persona_config = config;
1824            agent
1825        }
1826
1827        #[tokio::test]
1828        async fn disabled_config_skips_spawn() {
1829            // persona.enabled=false → nothing is spawned; persona_memory stays empty.
1830            let provider = mock_provider(vec![]);
1831            let mut agent = agent_with_persona(
1832                &provider,
1833                PersonaConfig {
1834                    enabled: false,
1835                    ..PersonaConfig::default()
1836                },
1837            )
1838            .await;
1839
1840            // Inject a user message so message count is above threshold.
1841            agent.msg.messages.push(zeph_llm::provider::Message {
1842                role: Role::User,
1843                content: "I prefer Rust for systems programming".to_owned(),
1844                parts: vec![],
1845                metadata: MessageMetadata::default(),
1846            });
1847
1848            agent.enqueue_persona_extraction_task();
1849
1850            let store = agent
1851                .memory_state
1852                .persistence
1853                .memory
1854                .as_ref()
1855                .unwrap()
1856                .sqlite()
1857                .clone();
1858            let count = store.count_persona_facts().await.unwrap();
1859            assert_eq!(count, 0, "disabled persona config must not write any facts");
1860        }
1861
1862        #[tokio::test]
1863        async fn below_min_messages_skips_spawn() {
1864            // min_messages=3 but only 2 user messages → should skip.
1865            let provider = mock_provider(vec![]);
1866            let mut agent = agent_with_persona(
1867                &provider,
1868                PersonaConfig {
1869                    enabled: true,
1870                    min_messages: 3,
1871                    ..PersonaConfig::default()
1872                },
1873            )
1874            .await;
1875
1876            for text in ["I use Rust", "I prefer async code"] {
1877                agent.msg.messages.push(zeph_llm::provider::Message {
1878                    role: Role::User,
1879                    content: text.to_owned(),
1880                    parts: vec![],
1881                    metadata: MessageMetadata::default(),
1882                });
1883            }
1884
1885            agent.enqueue_persona_extraction_task();
1886
1887            let store = agent
1888                .memory_state
1889                .persistence
1890                .memory
1891                .as_ref()
1892                .unwrap()
1893                .sqlite()
1894                .clone();
1895            let count = store.count_persona_facts().await.unwrap();
1896            assert_eq!(
1897                count, 0,
1898                "below min_messages threshold must not trigger extraction"
1899            );
1900        }
1901
1902        #[tokio::test]
1903        async fn no_memory_skips_spawn() {
1904            // memory=None → must exit early without panic.
1905            let provider = mock_provider(vec![]);
1906            let channel = MockChannel::new(vec![]);
1907            let registry = create_test_registry();
1908            let executor = MockToolExecutor::no_tools();
1909            let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
1910            agent.memory_state.extraction.persona_config = enabled_persona_config();
1911            agent.msg.messages.push(zeph_llm::provider::Message {
1912                role: Role::User,
1913                content: "I like Rust".to_owned(),
1914                parts: vec![],
1915                metadata: MessageMetadata::default(),
1916            });
1917
1918            // Must not panic even without memory.
1919            agent.enqueue_persona_extraction_task();
1920        }
1921
1922        #[tokio::test]
1923        async fn enabled_enough_messages_spawns_extraction() {
1924            // enabled=true, min_messages=1, self-referential message → extraction runs eagerly
1925            // (not fire-and-forget) and chat() is called on the provider, verified via MockProvider.
1926            use zeph_llm::mock::MockProvider;
1927            let (mock, recorded) = MockProvider::default().with_recording();
1928            let provider = AnyProvider::Mock(mock);
1929            let mut agent = agent_with_persona(&provider, enabled_persona_config()).await;
1930
1931            agent.msg.messages.push(zeph_llm::provider::Message {
1932                role: Role::User,
1933                content: "I prefer Rust for systems programming".to_owned(),
1934                parts: vec![],
1935                metadata: MessageMetadata::default(),
1936            });
1937
1938            agent.enqueue_persona_extraction_task();
1939
1940            // Persona extraction runs via BackgroundSupervisor. Wait for tasks to complete.
1941            agent.lifecycle.supervisor.join_all_for_test().await;
1942
1943            let calls = recorded.lock().unwrap();
1944            assert!(
1945                !calls.is_empty(),
1946                "happy-path: provider.chat() must be called when extraction completes"
1947            );
1948        }
1949
1950        #[tokio::test]
1951        async fn messages_capped_at_eight() {
1952            // More than 8 user messages → only 8 are passed to extraction.
1953            // Each message contains "I" so self-referential gate passes.
1954            use zeph_llm::mock::MockProvider;
1955            let (mock, recorded) = MockProvider::default().with_recording();
1956            let provider = AnyProvider::Mock(mock);
1957            let mut agent = agent_with_persona(&provider, enabled_persona_config()).await;
1958
1959            for i in 0..12u32 {
1960                agent.msg.messages.push(zeph_llm::provider::Message {
1961                    role: Role::User,
1962                    content: format!("I like message {i}"),
1963                    parts: vec![],
1964                    metadata: MessageMetadata::default(),
1965                });
1966            }
1967
1968            agent.enqueue_persona_extraction_task();
1969
1970            // Allow the background task to complete before asserting.
1971            agent.lifecycle.supervisor.join_all_for_test().await;
1972
1973            // Verify extraction ran (provider was called).
1974            let calls = recorded.lock().unwrap();
1975            assert!(
1976                !calls.is_empty(),
1977                "extraction must run when enough messages present"
1978            );
1979            // Verify the prompt sent to the provider does not contain messages beyond the 8th.
1980            let prompt = &calls[0];
1981            let user_text = prompt
1982                .iter()
1983                .filter(|m| m.role == Role::User)
1984                .map(|m| m.content.as_str())
1985                .collect::<Vec<_>>()
1986                .join(" ");
1987            // Messages 8..11 ("I like message 8".."I like message 11") must not appear.
1988            assert!(
1989                !user_text.contains("I like message 8"),
1990                "message index 8 must be excluded from extraction input"
1991            );
1992        }
1993
1994        #[test]
1995        fn long_message_truncated_at_char_boundary() {
1996            // Directly verify the per-message truncation logic applied in
1997            // enqueue_persona_extraction_task: a content > 2048 bytes must be capped
1998            // to exactly floor_char_boundary(2048).
1999            let long_content = "x".repeat(3000);
2000            let truncated = if long_content.len() > 2048 {
2001                long_content[..long_content.floor_char_boundary(2048)].to_owned()
2002            } else {
2003                long_content.clone()
2004            };
2005            assert_eq!(
2006                truncated.len(),
2007                2048,
2008                "ASCII content must be truncated to exactly 2048 bytes"
2009            );
2010
2011            // Multi-byte boundary: build a string whose char boundary falls before 2048.
2012            let multi = "é".repeat(1500); // each 'é' is 2 bytes → 3000 bytes total
2013            let truncated_multi = if multi.len() > 2048 {
2014                multi[..multi.floor_char_boundary(2048)].to_owned()
2015            } else {
2016                multi.clone()
2017            };
2018            assert!(
2019                truncated_multi.len() <= 2048,
2020                "multi-byte content must not exceed 2048 bytes"
2021            );
2022            assert!(truncated_multi.is_char_boundary(truncated_multi.len()));
2023        }
2024    }
2025
2026    #[tokio::test]
2027    async fn persist_message_user_always_embeds_regardless_of_autosave_flag() {
2028        let provider = mock_provider(vec![]);
2029        let channel = MockChannel::new(vec![]);
2030        let registry = create_test_registry();
2031        let executor = MockToolExecutor::no_tools();
2032
2033        let (tx, rx) = tokio::sync::watch::channel(MetricsSnapshot::default());
2034        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2035        let cid = memory.sqlite().create_conversation().await.unwrap();
2036
2037        // autosave_assistant=false — but User role always takes embedding path
2038        let mut agent = Agent::new(provider, channel, registry, None, 5, executor)
2039            .with_metrics(tx)
2040            .with_memory(std::sync::Arc::new(memory), cid, 50, 5, 100);
2041        agent.memory_state.persistence.autosave_assistant = false;
2042        agent.memory_state.persistence.autosave_min_length = 20;
2043
2044        let long_user_msg = "A".repeat(100);
2045        agent
2046            .persist_message(Role::User, &long_user_msg, &[], false)
2047            .await;
2048
2049        let history = agent
2050            .memory_state
2051            .persistence
2052            .memory
2053            .as_ref()
2054            .unwrap()
2055            .sqlite()
2056            .load_history(cid, 50)
2057            .await
2058            .unwrap();
2059        assert_eq!(history.len(), 1, "user message must be saved");
2060        // User messages go through remember_with_parts (embedding path).
2061        // sqlite_message_count must increment regardless of Qdrant availability.
2062        assert_eq!(rx.borrow().sqlite_message_count, 1);
2063    }
2064
2065    // Round-trip tests: verify that persist_message saves the correct parts and they
2066    // are restored correctly by load_history.
2067
2068    #[tokio::test]
2069    async fn persist_message_saves_correct_tool_use_parts() {
2070        use zeph_llm::provider::MessagePart;
2071
2072        let provider = mock_provider(vec![]);
2073        let channel = MockChannel::new(vec![]);
2074        let registry = create_test_registry();
2075        let executor = MockToolExecutor::no_tools();
2076
2077        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2078        let cid = memory.sqlite().create_conversation().await.unwrap();
2079
2080        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2081            std::sync::Arc::new(memory),
2082            cid,
2083            50,
2084            5,
2085            100,
2086        );
2087
2088        let parts = vec![MessagePart::ToolUse {
2089            id: "call_abc123".to_string(),
2090            name: "read_file".to_string(),
2091            input: serde_json::json!({"path": "/tmp/test.txt"}),
2092        }];
2093        let content = "[tool_use: read_file(call_abc123)]";
2094
2095        agent
2096            .persist_message(Role::Assistant, content, &parts, false)
2097            .await;
2098
2099        let history = agent
2100            .memory_state
2101            .persistence
2102            .memory
2103            .as_ref()
2104            .unwrap()
2105            .sqlite()
2106            .load_history(cid, 50)
2107            .await
2108            .unwrap();
2109
2110        assert_eq!(history.len(), 1);
2111        assert_eq!(history[0].role, Role::Assistant);
2112        assert_eq!(history[0].content, content);
2113        assert_eq!(history[0].parts.len(), 1);
2114        match &history[0].parts[0] {
2115            MessagePart::ToolUse { id, name, .. } => {
2116                assert_eq!(id, "call_abc123");
2117                assert_eq!(name, "read_file");
2118            }
2119            other => panic!("expected ToolUse part, got {other:?}"),
2120        }
2121        // Regression guard: assistant message must NOT have ToolResult parts
2122        assert!(
2123            !history[0]
2124                .parts
2125                .iter()
2126                .any(|p| matches!(p, MessagePart::ToolResult { .. })),
2127            "assistant message must not contain ToolResult parts"
2128        );
2129    }
2130
2131    #[tokio::test]
2132    async fn persist_message_saves_correct_tool_result_parts() {
2133        use zeph_llm::provider::MessagePart;
2134
2135        let provider = mock_provider(vec![]);
2136        let channel = MockChannel::new(vec![]);
2137        let registry = create_test_registry();
2138        let executor = MockToolExecutor::no_tools();
2139
2140        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2141        let cid = memory.sqlite().create_conversation().await.unwrap();
2142
2143        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2144            std::sync::Arc::new(memory),
2145            cid,
2146            50,
2147            5,
2148            100,
2149        );
2150
2151        let parts = vec![MessagePart::ToolResult {
2152            tool_use_id: "call_abc123".to_string(),
2153            content: "file contents here".to_string(),
2154            is_error: false,
2155        }];
2156        let content = "[tool_result: call_abc123]\nfile contents here";
2157
2158        agent
2159            .persist_message(Role::User, content, &parts, false)
2160            .await;
2161
2162        let history = agent
2163            .memory_state
2164            .persistence
2165            .memory
2166            .as_ref()
2167            .unwrap()
2168            .sqlite()
2169            .load_history(cid, 50)
2170            .await
2171            .unwrap();
2172
2173        assert_eq!(history.len(), 1);
2174        assert_eq!(history[0].role, Role::User);
2175        assert_eq!(history[0].content, content);
2176        assert_eq!(history[0].parts.len(), 1);
2177        match &history[0].parts[0] {
2178            MessagePart::ToolResult {
2179                tool_use_id,
2180                content: result_content,
2181                is_error,
2182            } => {
2183                assert_eq!(tool_use_id, "call_abc123");
2184                assert_eq!(result_content, "file contents here");
2185                assert!(!is_error);
2186            }
2187            other => panic!("expected ToolResult part, got {other:?}"),
2188        }
2189        // Regression guard: user message with ToolResult must NOT have ToolUse parts
2190        assert!(
2191            !history[0]
2192                .parts
2193                .iter()
2194                .any(|p| matches!(p, MessagePart::ToolUse { .. })),
2195            "user ToolResult message must not contain ToolUse parts"
2196        );
2197    }
2198
2199    #[tokio::test]
2200    async fn persist_message_roundtrip_preserves_role_part_alignment() {
2201        use zeph_llm::provider::MessagePart;
2202
2203        let provider = mock_provider(vec![]);
2204        let channel = MockChannel::new(vec![]);
2205        let registry = create_test_registry();
2206        let executor = MockToolExecutor::no_tools();
2207
2208        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2209        let cid = memory.sqlite().create_conversation().await.unwrap();
2210
2211        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2212            std::sync::Arc::new(memory),
2213            cid,
2214            50,
2215            5,
2216            100,
2217        );
2218
2219        // Persist assistant message with ToolUse parts
2220        let assistant_parts = vec![MessagePart::ToolUse {
2221            id: "id_1".to_string(),
2222            name: "list_dir".to_string(),
2223            input: serde_json::json!({"path": "/tmp"}),
2224        }];
2225        agent
2226            .persist_message(
2227                Role::Assistant,
2228                "[tool_use: list_dir(id_1)]",
2229                &assistant_parts,
2230                false,
2231            )
2232            .await;
2233
2234        // Persist user message with ToolResult parts
2235        let user_parts = vec![MessagePart::ToolResult {
2236            tool_use_id: "id_1".to_string(),
2237            content: "file1.txt\nfile2.txt".to_string(),
2238            is_error: false,
2239        }];
2240        agent
2241            .persist_message(
2242                Role::User,
2243                "[tool_result: id_1]\nfile1.txt\nfile2.txt",
2244                &user_parts,
2245                false,
2246            )
2247            .await;
2248
2249        let history = agent
2250            .memory_state
2251            .persistence
2252            .memory
2253            .as_ref()
2254            .unwrap()
2255            .sqlite()
2256            .load_history(cid, 50)
2257            .await
2258            .unwrap();
2259
2260        assert_eq!(history.len(), 2);
2261
2262        // First message: assistant + ToolUse
2263        assert_eq!(history[0].role, Role::Assistant);
2264        assert_eq!(history[0].content, "[tool_use: list_dir(id_1)]");
2265        assert!(
2266            matches!(&history[0].parts[0], MessagePart::ToolUse { id, .. } if id == "id_1"),
2267            "first message must be assistant ToolUse"
2268        );
2269
2270        // Second message: user + ToolResult
2271        assert_eq!(history[1].role, Role::User);
2272        assert_eq!(
2273            history[1].content,
2274            "[tool_result: id_1]\nfile1.txt\nfile2.txt"
2275        );
2276        assert!(
2277            matches!(&history[1].parts[0], MessagePart::ToolResult { tool_use_id, .. } if tool_use_id == "id_1"),
2278            "second message must be user ToolResult"
2279        );
2280
2281        // Cross-role regression guard: no swapped parts
2282        assert!(
2283            !history[0]
2284                .parts
2285                .iter()
2286                .any(|p| matches!(p, MessagePart::ToolResult { .. })),
2287            "assistant message must not have ToolResult parts"
2288        );
2289        assert!(
2290            !history[1]
2291                .parts
2292                .iter()
2293                .any(|p| matches!(p, MessagePart::ToolUse { .. })),
2294            "user message must not have ToolUse parts"
2295        );
2296    }
2297
2298    #[tokio::test]
2299    async fn persist_message_saves_correct_tool_output_parts() {
2300        use zeph_llm::provider::MessagePart;
2301
2302        let provider = mock_provider(vec![]);
2303        let channel = MockChannel::new(vec![]);
2304        let registry = create_test_registry();
2305        let executor = MockToolExecutor::no_tools();
2306
2307        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2308        let cid = memory.sqlite().create_conversation().await.unwrap();
2309
2310        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2311            std::sync::Arc::new(memory),
2312            cid,
2313            50,
2314            5,
2315            100,
2316        );
2317
2318        let parts = vec![MessagePart::ToolOutput {
2319            tool_name: "shell".into(),
2320            body: "hello from shell".to_string(),
2321            compacted_at: None,
2322        }];
2323        let content = "[tool: shell]\nhello from shell";
2324
2325        agent
2326            .persist_message(Role::User, content, &parts, false)
2327            .await;
2328
2329        let history = agent
2330            .memory_state
2331            .persistence
2332            .memory
2333            .as_ref()
2334            .unwrap()
2335            .sqlite()
2336            .load_history(cid, 50)
2337            .await
2338            .unwrap();
2339
2340        assert_eq!(history.len(), 1);
2341        assert_eq!(history[0].role, Role::User);
2342        assert_eq!(history[0].content, content);
2343        assert_eq!(history[0].parts.len(), 1);
2344        match &history[0].parts[0] {
2345            MessagePart::ToolOutput {
2346                tool_name,
2347                body,
2348                compacted_at,
2349            } => {
2350                assert_eq!(tool_name, "shell");
2351                assert_eq!(body, "hello from shell");
2352                assert!(compacted_at.is_none());
2353            }
2354            other => panic!("expected ToolOutput part, got {other:?}"),
2355        }
2356    }
2357
2358    // --- sanitize_tool_pairs unit tests ---
2359
2360    #[tokio::test]
2361    async fn load_history_removes_trailing_orphan_tool_use() {
2362        use zeph_llm::provider::MessagePart;
2363
2364        let provider = mock_provider(vec![]);
2365        let channel = MockChannel::new(vec![]);
2366        let registry = create_test_registry();
2367        let executor = MockToolExecutor::no_tools();
2368
2369        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2370        let cid = memory.sqlite().create_conversation().await.unwrap();
2371        let sqlite = memory.sqlite();
2372
2373        // user message (normal)
2374        sqlite
2375            .save_message(cid, "user", "do something with a tool")
2376            .await
2377            .unwrap();
2378
2379        // assistant message with ToolUse parts — no following tool_result (orphan)
2380        let parts = serde_json::to_string(&[MessagePart::ToolUse {
2381            id: "call_orphan".to_string(),
2382            name: "shell".to_string(),
2383            input: serde_json::json!({"command": "ls"}),
2384        }])
2385        .unwrap();
2386        sqlite
2387            .save_message_with_parts(cid, "assistant", "[tool_use: shell(call_orphan)]", &parts)
2388            .await
2389            .unwrap();
2390
2391        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2392            std::sync::Arc::new(memory),
2393            cid,
2394            50,
2395            5,
2396            100,
2397        );
2398
2399        let messages_before = agent.msg.messages.len();
2400        agent.load_history().await.unwrap();
2401
2402        // Only the user message should be loaded; orphaned assistant tool_use removed.
2403        assert_eq!(
2404            agent.msg.messages.len(),
2405            messages_before + 1,
2406            "orphaned trailing tool_use must be removed"
2407        );
2408        assert_eq!(agent.msg.messages.last().unwrap().role, Role::User);
2409    }
2410
2411    #[tokio::test]
2412    async fn load_history_removes_leading_orphan_tool_result() {
2413        use zeph_llm::provider::MessagePart;
2414
2415        let provider = mock_provider(vec![]);
2416        let channel = MockChannel::new(vec![]);
2417        let registry = create_test_registry();
2418        let executor = MockToolExecutor::no_tools();
2419
2420        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2421        let cid = memory.sqlite().create_conversation().await.unwrap();
2422        let sqlite = memory.sqlite();
2423
2424        // Leading orphan: user message with ToolResult but no preceding tool_use
2425        let result_parts = serde_json::to_string(&[MessagePart::ToolResult {
2426            tool_use_id: "call_missing".to_string(),
2427            content: "result data".to_string(),
2428            is_error: false,
2429        }])
2430        .unwrap();
2431        sqlite
2432            .save_message_with_parts(
2433                cid,
2434                "user",
2435                "[tool_result: call_missing]\nresult data",
2436                &result_parts,
2437            )
2438            .await
2439            .unwrap();
2440
2441        // A valid assistant reply after the orphan
2442        sqlite
2443            .save_message(cid, "assistant", "here is my response")
2444            .await
2445            .unwrap();
2446
2447        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2448            std::sync::Arc::new(memory),
2449            cid,
2450            50,
2451            5,
2452            100,
2453        );
2454
2455        let messages_before = agent.msg.messages.len();
2456        agent.load_history().await.unwrap();
2457
2458        // Orphaned leading tool_result removed; only assistant message kept.
2459        assert_eq!(
2460            agent.msg.messages.len(),
2461            messages_before + 1,
2462            "orphaned leading tool_result must be removed"
2463        );
2464        assert_eq!(agent.msg.messages.last().unwrap().role, Role::Assistant);
2465    }
2466
2467    #[tokio::test]
2468    async fn load_history_preserves_complete_tool_pairs() {
2469        use zeph_llm::provider::MessagePart;
2470
2471        let provider = mock_provider(vec![]);
2472        let channel = MockChannel::new(vec![]);
2473        let registry = create_test_registry();
2474        let executor = MockToolExecutor::no_tools();
2475
2476        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2477        let cid = memory.sqlite().create_conversation().await.unwrap();
2478        let sqlite = memory.sqlite();
2479
2480        // Complete tool_use / tool_result pair
2481        let use_parts = serde_json::to_string(&[MessagePart::ToolUse {
2482            id: "call_ok".to_string(),
2483            name: "shell".to_string(),
2484            input: serde_json::json!({"command": "pwd"}),
2485        }])
2486        .unwrap();
2487        sqlite
2488            .save_message_with_parts(cid, "assistant", "[tool_use: shell(call_ok)]", &use_parts)
2489            .await
2490            .unwrap();
2491
2492        let result_parts = serde_json::to_string(&[MessagePart::ToolResult {
2493            tool_use_id: "call_ok".to_string(),
2494            content: "/home/user".to_string(),
2495            is_error: false,
2496        }])
2497        .unwrap();
2498        sqlite
2499            .save_message_with_parts(
2500                cid,
2501                "user",
2502                "[tool_result: call_ok]\n/home/user",
2503                &result_parts,
2504            )
2505            .await
2506            .unwrap();
2507
2508        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2509            std::sync::Arc::new(memory),
2510            cid,
2511            50,
2512            5,
2513            100,
2514        );
2515
2516        let messages_before = agent.msg.messages.len();
2517        agent.load_history().await.unwrap();
2518
2519        // Both messages must be preserved.
2520        assert_eq!(
2521            agent.msg.messages.len(),
2522            messages_before + 2,
2523            "complete tool_use/tool_result pair must be preserved"
2524        );
2525        assert_eq!(agent.msg.messages[messages_before].role, Role::Assistant);
2526        assert_eq!(agent.msg.messages[messages_before + 1].role, Role::User);
2527    }
2528
2529    #[tokio::test]
2530    async fn load_history_handles_multiple_trailing_orphans() {
2531        use zeph_llm::provider::MessagePart;
2532
2533        let provider = mock_provider(vec![]);
2534        let channel = MockChannel::new(vec![]);
2535        let registry = create_test_registry();
2536        let executor = MockToolExecutor::no_tools();
2537
2538        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2539        let cid = memory.sqlite().create_conversation().await.unwrap();
2540        let sqlite = memory.sqlite();
2541
2542        // Normal user message
2543        sqlite.save_message(cid, "user", "start").await.unwrap();
2544
2545        // First orphaned tool_use
2546        let parts1 = serde_json::to_string(&[MessagePart::ToolUse {
2547            id: "call_1".to_string(),
2548            name: "shell".to_string(),
2549            input: serde_json::json!({}),
2550        }])
2551        .unwrap();
2552        sqlite
2553            .save_message_with_parts(cid, "assistant", "[tool_use: shell(call_1)]", &parts1)
2554            .await
2555            .unwrap();
2556
2557        // Second orphaned tool_use (consecutive, no tool_result between them)
2558        let parts2 = serde_json::to_string(&[MessagePart::ToolUse {
2559            id: "call_2".to_string(),
2560            name: "read_file".to_string(),
2561            input: serde_json::json!({}),
2562        }])
2563        .unwrap();
2564        sqlite
2565            .save_message_with_parts(cid, "assistant", "[tool_use: read_file(call_2)]", &parts2)
2566            .await
2567            .unwrap();
2568
2569        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2570            std::sync::Arc::new(memory),
2571            cid,
2572            50,
2573            5,
2574            100,
2575        );
2576
2577        let messages_before = agent.msg.messages.len();
2578        agent.load_history().await.unwrap();
2579
2580        // Both orphaned tool_use messages removed; only the user message kept.
2581        assert_eq!(
2582            agent.msg.messages.len(),
2583            messages_before + 1,
2584            "all trailing orphaned tool_use messages must be removed"
2585        );
2586        assert_eq!(agent.msg.messages.last().unwrap().role, Role::User);
2587    }
2588
2589    #[tokio::test]
2590    async fn load_history_no_tool_messages_unchanged() {
2591        let provider = mock_provider(vec![]);
2592        let channel = MockChannel::new(vec![]);
2593        let registry = create_test_registry();
2594        let executor = MockToolExecutor::no_tools();
2595
2596        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2597        let cid = memory.sqlite().create_conversation().await.unwrap();
2598        let sqlite = memory.sqlite();
2599
2600        sqlite.save_message(cid, "user", "hello").await.unwrap();
2601        sqlite
2602            .save_message(cid, "assistant", "hi there")
2603            .await
2604            .unwrap();
2605        sqlite
2606            .save_message(cid, "user", "how are you?")
2607            .await
2608            .unwrap();
2609
2610        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2611            std::sync::Arc::new(memory),
2612            cid,
2613            50,
2614            5,
2615            100,
2616        );
2617
2618        let messages_before = agent.msg.messages.len();
2619        agent.load_history().await.unwrap();
2620
2621        // All three plain messages must be preserved.
2622        assert_eq!(
2623            agent.msg.messages.len(),
2624            messages_before + 3,
2625            "plain messages without tool parts must pass through unchanged"
2626        );
2627    }
2628
2629    #[tokio::test]
2630    async fn load_history_removes_both_leading_and_trailing_orphans() {
2631        use zeph_llm::provider::MessagePart;
2632
2633        let provider = mock_provider(vec![]);
2634        let channel = MockChannel::new(vec![]);
2635        let registry = create_test_registry();
2636        let executor = MockToolExecutor::no_tools();
2637
2638        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2639        let cid = memory.sqlite().create_conversation().await.unwrap();
2640        let sqlite = memory.sqlite();
2641
2642        // Leading orphan: user message with ToolResult, no preceding tool_use
2643        let result_parts = serde_json::to_string(&[MessagePart::ToolResult {
2644            tool_use_id: "call_leading".to_string(),
2645            content: "orphaned result".to_string(),
2646            is_error: false,
2647        }])
2648        .unwrap();
2649        sqlite
2650            .save_message_with_parts(
2651                cid,
2652                "user",
2653                "[tool_result: call_leading]\norphaned result",
2654                &result_parts,
2655            )
2656            .await
2657            .unwrap();
2658
2659        // Valid middle messages
2660        sqlite
2661            .save_message(cid, "user", "what is 2+2?")
2662            .await
2663            .unwrap();
2664        sqlite.save_message(cid, "assistant", "4").await.unwrap();
2665
2666        // Trailing orphan: assistant message with ToolUse, no following tool_result
2667        let use_parts = serde_json::to_string(&[MessagePart::ToolUse {
2668            id: "call_trailing".to_string(),
2669            name: "shell".to_string(),
2670            input: serde_json::json!({"command": "date"}),
2671        }])
2672        .unwrap();
2673        sqlite
2674            .save_message_with_parts(
2675                cid,
2676                "assistant",
2677                "[tool_use: shell(call_trailing)]",
2678                &use_parts,
2679            )
2680            .await
2681            .unwrap();
2682
2683        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2684            std::sync::Arc::new(memory),
2685            cid,
2686            50,
2687            5,
2688            100,
2689        );
2690
2691        let messages_before = agent.msg.messages.len();
2692        agent.load_history().await.unwrap();
2693
2694        // Both orphans removed; only the 2 valid middle messages kept.
2695        assert_eq!(
2696            agent.msg.messages.len(),
2697            messages_before + 2,
2698            "both leading and trailing orphans must be removed"
2699        );
2700        assert_eq!(agent.msg.messages[messages_before].role, Role::User);
2701        assert_eq!(agent.msg.messages[messages_before].content, "what is 2+2?");
2702        assert_eq!(
2703            agent.msg.messages[messages_before + 1].role,
2704            Role::Assistant
2705        );
2706        assert_eq!(agent.msg.messages[messages_before + 1].content, "4");
2707    }
2708
2709    /// RC1 regression: mid-history assistant[`ToolUse`] without a following user[`ToolResult`]
2710    /// must have its `ToolUse` parts stripped (text preserved). The message count stays the same
2711    /// because the assistant message has a text content fallback; only `ToolUse` parts are
2712    /// removed.
2713    #[tokio::test]
2714    async fn sanitize_tool_pairs_strips_mid_history_orphan_tool_use() {
2715        use zeph_llm::provider::MessagePart;
2716
2717        let provider = mock_provider(vec![]);
2718        let channel = MockChannel::new(vec![]);
2719        let registry = create_test_registry();
2720        let executor = MockToolExecutor::no_tools();
2721
2722        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2723        let cid = memory.sqlite().create_conversation().await.unwrap();
2724        let sqlite = memory.sqlite();
2725
2726        // Normal first exchange.
2727        sqlite
2728            .save_message(cid, "user", "first question")
2729            .await
2730            .unwrap();
2731        sqlite
2732            .save_message(cid, "assistant", "first answer")
2733            .await
2734            .unwrap();
2735
2736        // Mid-history orphan: assistant with ToolUse but NO following ToolResult user message.
2737        // This models the compaction-split scenario (RC2) where replace_conversation hid the
2738        // user[ToolResult] but left the assistant[ToolUse] visible.
2739        let use_parts = serde_json::to_string(&[
2740            MessagePart::ToolUse {
2741                id: "call_mid_1".to_string(),
2742                name: "shell".to_string(),
2743                input: serde_json::json!({"command": "ls"}),
2744            },
2745            MessagePart::Text {
2746                text: "Let me check the files.".to_string(),
2747            },
2748        ])
2749        .unwrap();
2750        sqlite
2751            .save_message_with_parts(cid, "assistant", "Let me check the files.", &use_parts)
2752            .await
2753            .unwrap();
2754
2755        // Another normal exchange after the orphan.
2756        sqlite
2757            .save_message(cid, "user", "second question")
2758            .await
2759            .unwrap();
2760        sqlite
2761            .save_message(cid, "assistant", "second answer")
2762            .await
2763            .unwrap();
2764
2765        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2766            std::sync::Arc::new(memory),
2767            cid,
2768            50,
2769            5,
2770            100,
2771        );
2772
2773        let messages_before = agent.msg.messages.len();
2774        agent.load_history().await.unwrap();
2775
2776        // All 5 messages remain (orphan message kept because it has text), but the orphaned
2777        // message must have its ToolUse parts stripped.
2778        assert_eq!(
2779            agent.msg.messages.len(),
2780            messages_before + 5,
2781            "message count must be 5 (orphan message kept — has text content)"
2782        );
2783
2784        // The orphaned assistant message (index 2 in the loaded slice) must have no ToolUse parts.
2785        let orphan = &agent.msg.messages[messages_before + 2];
2786        assert_eq!(orphan.role, Role::Assistant);
2787        assert!(
2788            !orphan
2789                .parts
2790                .iter()
2791                .any(|p| matches!(p, MessagePart::ToolUse { .. })),
2792            "orphaned ToolUse parts must be stripped from mid-history message"
2793        );
2794        // Text part must be preserved.
2795        assert!(
2796            orphan.parts.iter().any(
2797                |p| matches!(p, MessagePart::Text { text } if text == "Let me check the files.")
2798            ),
2799            "text content of orphaned assistant message must be preserved"
2800        );
2801    }
2802
2803    /// RC3 regression: a user message with empty `content` but non-empty `parts` (`ToolResult`)
2804    /// must NOT be skipped by `load_history`. Previously the empty-content check dropped these
2805    /// messages before `sanitize_tool_pairs` ran, leaving the preceding assistant `ToolUse`
2806    /// orphaned.
2807    #[tokio::test]
2808    async fn load_history_keeps_tool_only_user_message() {
2809        use zeph_llm::provider::MessagePart;
2810
2811        let provider = mock_provider(vec![]);
2812        let channel = MockChannel::new(vec![]);
2813        let registry = create_test_registry();
2814        let executor = MockToolExecutor::no_tools();
2815
2816        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2817        let cid = memory.sqlite().create_conversation().await.unwrap();
2818        let sqlite = memory.sqlite();
2819
2820        // Assistant sends a ToolUse.
2821        let use_parts = serde_json::to_string(&[MessagePart::ToolUse {
2822            id: "call_rc3".to_string(),
2823            name: "memory_save".to_string(),
2824            input: serde_json::json!({"content": "something"}),
2825        }])
2826        .unwrap();
2827        sqlite
2828            .save_message_with_parts(cid, "assistant", "[tool_use: memory_save]", &use_parts)
2829            .await
2830            .unwrap();
2831
2832        // User message has empty text content but carries ToolResult in parts — native tool pattern.
2833        let result_parts = serde_json::to_string(&[MessagePart::ToolResult {
2834            tool_use_id: "call_rc3".to_string(),
2835            content: "saved".to_string(),
2836            is_error: false,
2837        }])
2838        .unwrap();
2839        sqlite
2840            .save_message_with_parts(cid, "user", "", &result_parts)
2841            .await
2842            .unwrap();
2843
2844        sqlite.save_message(cid, "assistant", "done").await.unwrap();
2845
2846        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2847            std::sync::Arc::new(memory),
2848            cid,
2849            50,
2850            5,
2851            100,
2852        );
2853
2854        let messages_before = agent.msg.messages.len();
2855        agent.load_history().await.unwrap();
2856
2857        // All 3 messages must be loaded — the empty-content ToolResult user message must NOT be
2858        // dropped.
2859        assert_eq!(
2860            agent.msg.messages.len(),
2861            messages_before + 3,
2862            "user message with empty content but ToolResult parts must not be dropped"
2863        );
2864
2865        // The user message at index 1 must still carry the ToolResult part.
2866        let user_msg = &agent.msg.messages[messages_before + 1];
2867        assert_eq!(user_msg.role, Role::User);
2868        assert!(
2869            user_msg.parts.iter().any(
2870                |p| matches!(p, MessagePart::ToolResult { tool_use_id, .. } if tool_use_id == "call_rc3")
2871            ),
2872            "ToolResult part must be preserved on user message with empty content"
2873        );
2874    }
2875
2876    /// RC2 reverse pass: a user message with a `ToolResult` whose `tool_use_id` has no matching
2877    /// `ToolUse` in the preceding assistant message must be stripped by
2878    /// `strip_mid_history_orphans`.
2879    #[tokio::test]
2880    async fn strip_orphans_removes_orphaned_tool_result() {
2881        use zeph_llm::provider::MessagePart;
2882
2883        let provider = mock_provider(vec![]);
2884        let channel = MockChannel::new(vec![]);
2885        let registry = create_test_registry();
2886        let executor = MockToolExecutor::no_tools();
2887
2888        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2889        let cid = memory.sqlite().create_conversation().await.unwrap();
2890        let sqlite = memory.sqlite();
2891
2892        // Normal exchange before the orphan.
2893        sqlite.save_message(cid, "user", "hello").await.unwrap();
2894        sqlite.save_message(cid, "assistant", "hi").await.unwrap();
2895
2896        // Assistant message that does NOT contain a ToolUse.
2897        sqlite
2898            .save_message(cid, "assistant", "plain answer")
2899            .await
2900            .unwrap();
2901
2902        // User message references a tool_use_id that was never sent by the preceding assistant.
2903        let orphan_result_parts = serde_json::to_string(&[MessagePart::ToolResult {
2904            tool_use_id: "call_nonexistent".to_string(),
2905            content: "stale result".to_string(),
2906            is_error: false,
2907        }])
2908        .unwrap();
2909        sqlite
2910            .save_message_with_parts(
2911                cid,
2912                "user",
2913                "[tool_result: call_nonexistent]\nstale result",
2914                &orphan_result_parts,
2915            )
2916            .await
2917            .unwrap();
2918
2919        sqlite
2920            .save_message(cid, "assistant", "final")
2921            .await
2922            .unwrap();
2923
2924        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2925            std::sync::Arc::new(memory),
2926            cid,
2927            50,
2928            5,
2929            100,
2930        );
2931
2932        let messages_before = agent.msg.messages.len();
2933        agent.load_history().await.unwrap();
2934
2935        // The orphaned ToolResult part must have been stripped from the user message.
2936        // The user message itself may be removed (parts empty + content non-empty) or kept with
2937        // the text content — but it must NOT retain the orphaned ToolResult part.
2938        let loaded = &agent.msg.messages[messages_before..];
2939        for msg in loaded {
2940            assert!(
2941                !msg.parts.iter().any(|p| matches!(
2942                    p,
2943                    MessagePart::ToolResult { tool_use_id, .. } if tool_use_id == "call_nonexistent"
2944                )),
2945                "orphaned ToolResult part must be stripped from history"
2946            );
2947        }
2948    }
2949
2950    /// RC2 reverse pass: a complete `tool_use` + `tool_result` pair must pass through the reverse
2951    /// orphan check intact; the fix must not strip valid `ToolResult` parts.
2952    #[tokio::test]
2953    async fn strip_orphans_keeps_complete_pair() {
2954        use zeph_llm::provider::MessagePart;
2955
2956        let provider = mock_provider(vec![]);
2957        let channel = MockChannel::new(vec![]);
2958        let registry = create_test_registry();
2959        let executor = MockToolExecutor::no_tools();
2960
2961        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2962        let cid = memory.sqlite().create_conversation().await.unwrap();
2963        let sqlite = memory.sqlite();
2964
2965        let use_parts = serde_json::to_string(&[MessagePart::ToolUse {
2966            id: "call_valid".to_string(),
2967            name: "shell".to_string(),
2968            input: serde_json::json!({"command": "ls"}),
2969        }])
2970        .unwrap();
2971        sqlite
2972            .save_message_with_parts(cid, "assistant", "[tool_use: shell]", &use_parts)
2973            .await
2974            .unwrap();
2975
2976        let result_parts = serde_json::to_string(&[MessagePart::ToolResult {
2977            tool_use_id: "call_valid".to_string(),
2978            content: "file.rs".to_string(),
2979            is_error: false,
2980        }])
2981        .unwrap();
2982        sqlite
2983            .save_message_with_parts(cid, "user", "", &result_parts)
2984            .await
2985            .unwrap();
2986
2987        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2988            std::sync::Arc::new(memory),
2989            cid,
2990            50,
2991            5,
2992            100,
2993        );
2994
2995        let messages_before = agent.msg.messages.len();
2996        agent.load_history().await.unwrap();
2997
2998        assert_eq!(
2999            agent.msg.messages.len(),
3000            messages_before + 2,
3001            "complete tool_use/tool_result pair must be preserved"
3002        );
3003
3004        let user_msg = &agent.msg.messages[messages_before + 1];
3005        assert!(
3006            user_msg.parts.iter().any(|p| matches!(
3007                p,
3008                MessagePart::ToolResult { tool_use_id, .. } if tool_use_id == "call_valid"
3009            )),
3010            "ToolResult part for a matched tool_use must not be stripped"
3011        );
3012    }
3013
3014    /// RC2 reverse pass: history with a mix of complete pairs and orphaned `ToolResult` messages.
3015    /// Orphaned `ToolResult` parts must be stripped; complete pairs must pass through intact.
3016    #[tokio::test]
3017    async fn strip_orphans_mixed_history() {
3018        use zeph_llm::provider::MessagePart;
3019
3020        let provider = mock_provider(vec![]);
3021        let channel = MockChannel::new(vec![]);
3022        let registry = create_test_registry();
3023        let executor = MockToolExecutor::no_tools();
3024
3025        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
3026        let cid = memory.sqlite().create_conversation().await.unwrap();
3027        let sqlite = memory.sqlite();
3028
3029        // First: complete tool_use / tool_result pair.
3030        let use_parts_ok = serde_json::to_string(&[MessagePart::ToolUse {
3031            id: "call_good".to_string(),
3032            name: "shell".to_string(),
3033            input: serde_json::json!({"command": "pwd"}),
3034        }])
3035        .unwrap();
3036        sqlite
3037            .save_message_with_parts(cid, "assistant", "[tool_use: shell]", &use_parts_ok)
3038            .await
3039            .unwrap();
3040
3041        let result_parts_ok = serde_json::to_string(&[MessagePart::ToolResult {
3042            tool_use_id: "call_good".to_string(),
3043            content: "/home".to_string(),
3044            is_error: false,
3045        }])
3046        .unwrap();
3047        sqlite
3048            .save_message_with_parts(cid, "user", "", &result_parts_ok)
3049            .await
3050            .unwrap();
3051
3052        // Second: plain assistant message followed by an orphaned ToolResult user message.
3053        sqlite
3054            .save_message(cid, "assistant", "text only")
3055            .await
3056            .unwrap();
3057
3058        let orphan_parts = serde_json::to_string(&[MessagePart::ToolResult {
3059            tool_use_id: "call_ghost".to_string(),
3060            content: "ghost result".to_string(),
3061            is_error: false,
3062        }])
3063        .unwrap();
3064        sqlite
3065            .save_message_with_parts(
3066                cid,
3067                "user",
3068                "[tool_result: call_ghost]\nghost result",
3069                &orphan_parts,
3070            )
3071            .await
3072            .unwrap();
3073
3074        sqlite
3075            .save_message(cid, "assistant", "final reply")
3076            .await
3077            .unwrap();
3078
3079        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
3080            std::sync::Arc::new(memory),
3081            cid,
3082            50,
3083            5,
3084            100,
3085        );
3086
3087        let messages_before = agent.msg.messages.len();
3088        agent.load_history().await.unwrap();
3089
3090        let loaded = &agent.msg.messages[messages_before..];
3091
3092        // The orphaned ToolResult part must not appear in any message.
3093        for msg in loaded {
3094            assert!(
3095                !msg.parts.iter().any(|p| matches!(
3096                    p,
3097                    MessagePart::ToolResult { tool_use_id, .. } if tool_use_id == "call_ghost"
3098                )),
3099                "orphaned ToolResult (call_ghost) must be stripped from history"
3100            );
3101        }
3102
3103        // The matched ToolResult must still be present on the user message following the
3104        // first assistant message.
3105        let has_good_result = loaded.iter().any(|msg| {
3106            msg.role == Role::User
3107                && msg.parts.iter().any(|p| {
3108                    matches!(
3109                        p,
3110                        MessagePart::ToolResult { tool_use_id, .. } if tool_use_id == "call_good"
3111                    )
3112                })
3113        });
3114        assert!(
3115            has_good_result,
3116            "matched ToolResult (call_good) must be preserved in history"
3117        );
3118    }
3119
3120    /// Regression: a properly matched `tool_use`/`tool_result` pair must NOT be touched by the
3121    /// mid-history scan — ensures the fix doesn't break valid tool exchanges.
3122    #[tokio::test]
3123    async fn sanitize_tool_pairs_preserves_matched_tool_pair() {
3124        use zeph_llm::provider::MessagePart;
3125
3126        let provider = mock_provider(vec![]);
3127        let channel = MockChannel::new(vec![]);
3128        let registry = create_test_registry();
3129        let executor = MockToolExecutor::no_tools();
3130
3131        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
3132        let cid = memory.sqlite().create_conversation().await.unwrap();
3133        let sqlite = memory.sqlite();
3134
3135        sqlite
3136            .save_message(cid, "user", "run a command")
3137            .await
3138            .unwrap();
3139
3140        // Assistant sends a ToolUse.
3141        let use_parts = serde_json::to_string(&[MessagePart::ToolUse {
3142            id: "call_ok".to_string(),
3143            name: "shell".to_string(),
3144            input: serde_json::json!({"command": "echo hi"}),
3145        }])
3146        .unwrap();
3147        sqlite
3148            .save_message_with_parts(cid, "assistant", "[tool_use: shell]", &use_parts)
3149            .await
3150            .unwrap();
3151
3152        // Matching user ToolResult follows.
3153        let result_parts = serde_json::to_string(&[MessagePart::ToolResult {
3154            tool_use_id: "call_ok".to_string(),
3155            content: "hi".to_string(),
3156            is_error: false,
3157        }])
3158        .unwrap();
3159        sqlite
3160            .save_message_with_parts(cid, "user", "[tool_result: call_ok]\nhi", &result_parts)
3161            .await
3162            .unwrap();
3163
3164        sqlite.save_message(cid, "assistant", "done").await.unwrap();
3165
3166        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
3167            std::sync::Arc::new(memory),
3168            cid,
3169            50,
3170            5,
3171            100,
3172        );
3173
3174        let messages_before = agent.msg.messages.len();
3175        agent.load_history().await.unwrap();
3176
3177        // All 4 messages preserved, tool_use parts intact.
3178        assert_eq!(
3179            agent.msg.messages.len(),
3180            messages_before + 4,
3181            "matched tool pair must not be removed"
3182        );
3183        let tool_msg = &agent.msg.messages[messages_before + 1];
3184        assert!(
3185            tool_msg
3186                .parts
3187                .iter()
3188                .any(|p| matches!(p, MessagePart::ToolUse { id, .. } if id == "call_ok")),
3189            "matched ToolUse parts must be preserved"
3190        );
3191    }
3192
3193    /// RC5: `persist_cancelled_tool_results` must persist a tombstone user message containing
3194    /// `is_error=true` `ToolResult` parts for all `tool_calls` IDs so the preceding assistant
3195    /// `ToolUse` is never orphaned in the DB after a cancellation.
3196    #[tokio::test]
3197    async fn persist_cancelled_tool_results_pairs_tool_use() {
3198        use zeph_llm::provider::MessagePart;
3199
3200        let provider = mock_provider(vec![]);
3201        let channel = MockChannel::new(vec![]);
3202        let registry = create_test_registry();
3203        let executor = MockToolExecutor::no_tools();
3204
3205        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
3206        let cid = memory.sqlite().create_conversation().await.unwrap();
3207
3208        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
3209            std::sync::Arc::new(memory),
3210            cid,
3211            50,
3212            5,
3213            100,
3214        );
3215
3216        // Simulate: assistant message with two ToolUse parts already persisted.
3217        let tool_calls = vec![
3218            zeph_llm::provider::ToolUseRequest {
3219                id: "cancel_id_1".to_string(),
3220                name: "shell".to_string().into(),
3221                input: serde_json::json!({}),
3222            },
3223            zeph_llm::provider::ToolUseRequest {
3224                id: "cancel_id_2".to_string(),
3225                name: "read_file".to_string().into(),
3226                input: serde_json::json!({}),
3227            },
3228        ];
3229
3230        agent.persist_cancelled_tool_results(&tool_calls).await;
3231
3232        let history = agent
3233            .memory_state
3234            .persistence
3235            .memory
3236            .as_ref()
3237            .unwrap()
3238            .sqlite()
3239            .load_history(cid, 50)
3240            .await
3241            .unwrap();
3242
3243        // Exactly one user message must have been persisted.
3244        assert_eq!(history.len(), 1);
3245        assert_eq!(history[0].role, Role::User);
3246
3247        // It must contain is_error=true ToolResult for each tool call ID.
3248        for tc in &tool_calls {
3249            assert!(
3250                history[0].parts.iter().any(|p| matches!(
3251                    p,
3252                    MessagePart::ToolResult { tool_use_id, is_error, .. }
3253                        if tool_use_id == &tc.id && *is_error
3254                )),
3255                "tombstone ToolResult for {} must be present and is_error=true",
3256                tc.id
3257            );
3258        }
3259    }
3260
3261    // ---- has_meaningful_content unit tests ----
3262
3263    #[test]
3264    fn meaningful_content_empty_string() {
3265        assert!(!has_meaningful_content(""));
3266    }
3267
3268    #[test]
3269    fn meaningful_content_whitespace_only() {
3270        assert!(!has_meaningful_content("   \n\t  "));
3271    }
3272
3273    #[test]
3274    fn meaningful_content_tool_use_only() {
3275        assert!(!has_meaningful_content("[tool_use: shell(call_1)]"));
3276    }
3277
3278    #[test]
3279    fn meaningful_content_tool_use_no_parens() {
3280        // Format produced when tool_use is stored without explicit id parens.
3281        assert!(!has_meaningful_content("[tool_use: memory_save]"));
3282    }
3283
3284    #[test]
3285    fn meaningful_content_tool_result_with_body() {
3286        assert!(!has_meaningful_content(
3287            "[tool_result: call_1]\nsome output here"
3288        ));
3289    }
3290
3291    #[test]
3292    fn meaningful_content_tool_result_empty_body() {
3293        assert!(!has_meaningful_content("[tool_result: call_1]\n"));
3294    }
3295
3296    #[test]
3297    fn meaningful_content_tool_output_inline() {
3298        assert!(!has_meaningful_content("[tool output: bash] some result"));
3299    }
3300
3301    #[test]
3302    fn meaningful_content_tool_output_pruned() {
3303        assert!(!has_meaningful_content("[tool output: bash] (pruned)"));
3304    }
3305
3306    #[test]
3307    fn meaningful_content_tool_output_fenced() {
3308        assert!(!has_meaningful_content(
3309            "[tool output: bash]\n```\nls output\n```"
3310        ));
3311    }
3312
3313    #[test]
3314    fn meaningful_content_multiple_tool_use_tags() {
3315        assert!(!has_meaningful_content(
3316            "[tool_use: bash(id1)][tool_use: read(id2)]"
3317        ));
3318    }
3319
3320    #[test]
3321    fn meaningful_content_multiple_tool_use_tags_space_separator() {
3322        // Space between tags is not meaningful content.
3323        assert!(!has_meaningful_content(
3324            "[tool_use: bash(id1)] [tool_use: read(id2)]"
3325        ));
3326    }
3327
3328    #[test]
3329    fn meaningful_content_multiple_tool_use_tags_newline_separator() {
3330        // Newline-only separator is also not meaningful.
3331        assert!(!has_meaningful_content(
3332            "[tool_use: bash(id1)]\n[tool_use: read(id2)]"
3333        ));
3334    }
3335
3336    #[test]
3337    fn meaningful_content_tool_result_followed_by_tool_use() {
3338        // Two tags in sequence — no real text between them.
3339        assert!(!has_meaningful_content(
3340            "[tool_result: call_1]\nresult\n[tool_use: bash(call_2)]"
3341        ));
3342    }
3343
3344    #[test]
3345    fn meaningful_content_real_text_only() {
3346        assert!(has_meaningful_content("Hello, how can I help you?"));
3347    }
3348
3349    #[test]
3350    fn meaningful_content_text_before_tool_tag() {
3351        assert!(has_meaningful_content("Let me check. [tool_use: bash(id)]"));
3352    }
3353
3354    #[test]
3355    fn meaningful_content_text_after_tool_use_tag() {
3356        // Text appearing after a [tool_use: name] tag (without parens) is a JSON body
3357        // in the request-builder format — but since that format never reaches the DB,
3358        // this test verifies conservative behavior: the helper returns true (do not delete).
3359        assert!(has_meaningful_content("[tool_use: bash] I ran the command"));
3360    }
3361
3362    #[test]
3363    fn meaningful_content_text_between_tags() {
3364        assert!(has_meaningful_content(
3365            "[tool_use: bash(id1)]\nand then\n[tool_use: read(id2)]"
3366        ));
3367    }
3368
3369    #[test]
3370    fn meaningful_content_malformed_tag_no_closing_bracket() {
3371        // Conservative: malformed tag must not trigger delete.
3372        assert!(has_meaningful_content("[tool_use: "));
3373    }
3374
3375    #[test]
3376    fn meaningful_content_tool_use_and_tool_result_only() {
3377        // Typical persisted assistant+user pair content with no extra text.
3378        assert!(!has_meaningful_content(
3379            "[tool_use: memory_save(call_abc)]\n[tool_result: call_abc]\nsaved"
3380        ));
3381    }
3382
3383    #[test]
3384    fn meaningful_content_tool_result_body_with_json_array() {
3385        assert!(!has_meaningful_content(
3386            "[tool_result: id1]\n[\"array\", \"value\"]"
3387        ));
3388    }
3389
3390    // ---- Integration tests for the #2529 fix: soft-delete of legacy-content orphans ----
3391
3392    /// #2529 regression: orphaned assistant `ToolUse` + user `ToolResult` pair where BOTH messages
3393    /// have content consisting solely of legacy tool bracket strings (no human-readable text).
3394    ///
3395    /// Before the fix, `content.trim().is_empty()` returned false for these messages, so they
3396    /// were never soft-deleted and the WARN log repeated on every session restart.
3397    ///
3398    /// After the fix, `has_meaningful_content` returns false for legacy-only content, so both
3399    /// orphaned messages are soft-deleted (non-null `deleted_at`) in `SQLite`.
3400    #[tokio::test]
3401    async fn issue_2529_orphaned_legacy_content_pair_is_soft_deleted() {
3402        use zeph_llm::provider::MessagePart;
3403
3404        let provider = mock_provider(vec![]);
3405        let channel = MockChannel::new(vec![]);
3406        let registry = create_test_registry();
3407        let executor = MockToolExecutor::no_tools();
3408
3409        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
3410        let cid = memory.sqlite().create_conversation().await.unwrap();
3411        let sqlite = memory.sqlite();
3412
3413        // A normal user message that anchors the conversation.
3414        sqlite
3415            .save_message(cid, "user", "save this for me")
3416            .await
3417            .unwrap();
3418
3419        // Orphaned assistant[ToolUse]: content is ONLY a legacy tool tag — no matching
3420        // ToolResult follows. This is the exact pattern that triggered #2529.
3421        let use_parts = serde_json::to_string(&[MessagePart::ToolUse {
3422            id: "call_2529".to_string(),
3423            name: "memory_save".to_string(),
3424            input: serde_json::json!({"content": "save this"}),
3425        }])
3426        .unwrap();
3427        let orphan_assistant_id = sqlite
3428            .save_message_with_parts(
3429                cid,
3430                "assistant",
3431                "[tool_use: memory_save(call_2529)]",
3432                &use_parts,
3433            )
3434            .await
3435            .unwrap();
3436
3437        // Orphaned user[ToolResult]: content is ONLY a legacy tool tag + body.
3438        // Its tool_use_id ("call_2529") does not match any ToolUse in the preceding assistant
3439        // message in this position (will be made orphaned by inserting a plain assistant message
3440        // between them to break the pair).
3441        sqlite
3442            .save_message(cid, "assistant", "here is a plain reply")
3443            .await
3444            .unwrap();
3445
3446        let result_parts = serde_json::to_string(&[MessagePart::ToolResult {
3447            tool_use_id: "call_2529".to_string(),
3448            content: "saved".to_string(),
3449            is_error: false,
3450        }])
3451        .unwrap();
3452        let orphan_user_id = sqlite
3453            .save_message_with_parts(
3454                cid,
3455                "user",
3456                "[tool_result: call_2529]\nsaved",
3457                &result_parts,
3458            )
3459            .await
3460            .unwrap();
3461
3462        // Final plain message so history doesn't end on the orphan.
3463        sqlite.save_message(cid, "assistant", "done").await.unwrap();
3464
3465        let memory_arc = std::sync::Arc::new(memory);
3466        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
3467            memory_arc.clone(),
3468            cid,
3469            50,
3470            5,
3471            100,
3472        );
3473
3474        agent.load_history().await.unwrap();
3475
3476        // Verify that both orphaned messages now have `deleted_at IS NOT NULL` in SQLite.
3477        // COUNT(*) WHERE deleted_at IS NOT NULL returns 1 if soft-deleted, 0 otherwise.
3478        let assistant_deleted_count: Vec<i64> = zeph_db::query_scalar(
3479            "SELECT COUNT(*) FROM messages WHERE id = ? AND deleted_at IS NOT NULL",
3480        )
3481        .bind(orphan_assistant_id)
3482        .fetch_all(memory_arc.sqlite().pool())
3483        .await
3484        .unwrap();
3485
3486        let user_deleted_count: Vec<i64> = zeph_db::query_scalar(
3487            "SELECT COUNT(*) FROM messages WHERE id = ? AND deleted_at IS NOT NULL",
3488        )
3489        .bind(orphan_user_id)
3490        .fetch_all(memory_arc.sqlite().pool())
3491        .await
3492        .unwrap();
3493
3494        assert_eq!(
3495            assistant_deleted_count.first().copied().unwrap_or(0),
3496            1,
3497            "orphaned assistant[ToolUse] with legacy-only content must be soft-deleted (deleted_at IS NOT NULL)"
3498        );
3499        assert_eq!(
3500            user_deleted_count.first().copied().unwrap_or(0),
3501            1,
3502            "orphaned user[ToolResult] with legacy-only content must be soft-deleted (deleted_at IS NOT NULL)"
3503        );
3504    }
3505
3506    /// #2529 idempotency: after soft-delete on first `load_history`, a second call must not
3507    /// re-load the soft-deleted orphans. This ensures the WARN log does not repeat on the
3508    /// next session startup for the same orphaned messages.
3509    #[tokio::test]
3510    async fn issue_2529_soft_delete_is_idempotent_across_sessions() {
3511        use zeph_llm::provider::MessagePart;
3512
3513        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
3514        let cid = memory.sqlite().create_conversation().await.unwrap();
3515        let sqlite = memory.sqlite();
3516
3517        // Normal anchor message.
3518        sqlite
3519            .save_message(cid, "user", "do something")
3520            .await
3521            .unwrap();
3522
3523        // Orphaned assistant[ToolUse] with legacy-only content.
3524        let use_parts = serde_json::to_string(&[MessagePart::ToolUse {
3525            id: "call_idem".to_string(),
3526            name: "shell".to_string(),
3527            input: serde_json::json!({"command": "ls"}),
3528        }])
3529        .unwrap();
3530        sqlite
3531            .save_message_with_parts(cid, "assistant", "[tool_use: shell(call_idem)]", &use_parts)
3532            .await
3533            .unwrap();
3534
3535        // Break the pair: insert a plain assistant message so the ToolUse is mid-history orphan.
3536        sqlite
3537            .save_message(cid, "assistant", "continuing")
3538            .await
3539            .unwrap();
3540
3541        // Orphaned user[ToolResult] with legacy-only content.
3542        let result_parts = serde_json::to_string(&[MessagePart::ToolResult {
3543            tool_use_id: "call_idem".to_string(),
3544            content: "output".to_string(),
3545            is_error: false,
3546        }])
3547        .unwrap();
3548        sqlite
3549            .save_message_with_parts(
3550                cid,
3551                "user",
3552                "[tool_result: call_idem]\noutput",
3553                &result_parts,
3554            )
3555            .await
3556            .unwrap();
3557
3558        sqlite
3559            .save_message(cid, "assistant", "final")
3560            .await
3561            .unwrap();
3562
3563        let memory_arc = std::sync::Arc::new(memory);
3564
3565        // First session: load_history performs soft-delete of the orphaned pair.
3566        let mut agent1 = Agent::new(
3567            mock_provider(vec![]),
3568            MockChannel::new(vec![]),
3569            create_test_registry(),
3570            None,
3571            5,
3572            MockToolExecutor::no_tools(),
3573        )
3574        .with_memory(memory_arc.clone(), cid, 50, 5, 100);
3575        agent1.load_history().await.unwrap();
3576        let count_after_first = agent1.msg.messages.len();
3577
3578        // Second session: a fresh agent loading the same conversation must not see the
3579        // soft-deleted orphans — the WARN log must not repeat.
3580        let mut agent2 = Agent::new(
3581            mock_provider(vec![]),
3582            MockChannel::new(vec![]),
3583            create_test_registry(),
3584            None,
3585            5,
3586            MockToolExecutor::no_tools(),
3587        )
3588        .with_memory(memory_arc.clone(), cid, 50, 5, 100);
3589        agent2.load_history().await.unwrap();
3590        let count_after_second = agent2.msg.messages.len();
3591
3592        // Both sessions must load the same number of messages — soft-deleted orphans excluded.
3593        assert_eq!(
3594            count_after_first, count_after_second,
3595            "second load_history must load the same message count as the first (soft-deleted orphans excluded)"
3596        );
3597    }
3598
3599    /// Edge case for #2529: an orphaned assistant message whose content has BOTH meaningful text
3600    /// AND a `tool_use` tag must NOT be soft-deleted. The `ToolUse` parts are stripped but the
3601    /// message is kept because it has human-readable content.
3602    #[tokio::test]
3603    async fn issue_2529_message_with_text_and_tool_tag_is_kept_after_part_strip() {
3604        use zeph_llm::provider::MessagePart;
3605
3606        let provider = mock_provider(vec![]);
3607        let channel = MockChannel::new(vec![]);
3608        let registry = create_test_registry();
3609        let executor = MockToolExecutor::no_tools();
3610
3611        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
3612        let cid = memory.sqlite().create_conversation().await.unwrap();
3613        let sqlite = memory.sqlite();
3614
3615        // Normal first exchange.
3616        sqlite
3617            .save_message(cid, "user", "check the files")
3618            .await
3619            .unwrap();
3620
3621        // Assistant message: has BOTH meaningful text AND a ToolUse part.
3622        // Content contains real prose + legacy tag — has_meaningful_content must return true.
3623        let use_parts = serde_json::to_string(&[MessagePart::ToolUse {
3624            id: "call_mixed".to_string(),
3625            name: "shell".to_string(),
3626            input: serde_json::json!({"command": "ls"}),
3627        }])
3628        .unwrap();
3629        sqlite
3630            .save_message_with_parts(
3631                cid,
3632                "assistant",
3633                "Let me list the directory. [tool_use: shell(call_mixed)]",
3634                &use_parts,
3635            )
3636            .await
3637            .unwrap();
3638
3639        // No matching ToolResult follows — the ToolUse is orphaned.
3640        sqlite.save_message(cid, "user", "thanks").await.unwrap();
3641        sqlite
3642            .save_message(cid, "assistant", "you are welcome")
3643            .await
3644            .unwrap();
3645
3646        let memory_arc = std::sync::Arc::new(memory);
3647        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
3648            memory_arc.clone(),
3649            cid,
3650            50,
3651            5,
3652            100,
3653        );
3654
3655        let messages_before = agent.msg.messages.len();
3656        agent.load_history().await.unwrap();
3657
3658        // All 4 messages must be present — the mixed-content assistant message is KEPT.
3659        assert_eq!(
3660            agent.msg.messages.len(),
3661            messages_before + 4,
3662            "assistant message with text + tool tag must not be removed after ToolUse strip"
3663        );
3664
3665        // The mixed-content assistant message must have its ToolUse parts stripped.
3666        let mixed_msg = agent
3667            .msg
3668            .messages
3669            .iter()
3670            .find(|m| m.content.contains("Let me list the directory"))
3671            .expect("mixed-content assistant message must still be in history");
3672        assert!(
3673            !mixed_msg
3674                .parts
3675                .iter()
3676                .any(|p| matches!(p, MessagePart::ToolUse { .. })),
3677            "orphaned ToolUse parts must be stripped even when message has meaningful text"
3678        );
3679        assert_eq!(
3680            mixed_msg.content, "Let me list the directory. [tool_use: shell(call_mixed)]",
3681            "content field must be unchanged — only parts are stripped"
3682        );
3683    }
3684
3685    // ── [skipped]/[stopped] ToolResult embedding guard (#2620) ──────────────
3686
3687    #[tokio::test]
3688    async fn persist_message_skipped_tool_result_does_not_embed() {
3689        use zeph_llm::provider::MessagePart;
3690
3691        let provider = mock_provider(vec![]);
3692        let channel = MockChannel::new(vec![]);
3693        let registry = create_test_registry();
3694        let executor = MockToolExecutor::no_tools();
3695
3696        let (tx, rx) = tokio::sync::watch::channel(MetricsSnapshot::default());
3697        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
3698        let cid = memory.sqlite().create_conversation().await.unwrap();
3699
3700        let mut agent = Agent::new(provider, channel, registry, None, 5, executor)
3701            .with_metrics(tx)
3702            .with_memory(std::sync::Arc::new(memory), cid, 50, 5, 100);
3703        agent.memory_state.persistence.autosave_assistant = true;
3704        agent.memory_state.persistence.autosave_min_length = 0;
3705
3706        let parts = vec![MessagePart::ToolResult {
3707            tool_use_id: "tu1".into(),
3708            content: "[skipped] bash tool was blocked by utility gate".into(),
3709            is_error: false,
3710        }];
3711
3712        agent
3713            .persist_message(
3714                Role::User,
3715                "[skipped] bash tool was blocked by utility gate",
3716                &parts,
3717                false,
3718            )
3719            .await;
3720
3721        assert_eq!(
3722            rx.borrow().embeddings_generated,
3723            0,
3724            "[skipped] ToolResult must not be embedded into Qdrant"
3725        );
3726    }
3727
3728    #[tokio::test]
3729    async fn persist_message_stopped_tool_result_does_not_embed() {
3730        use zeph_llm::provider::MessagePart;
3731
3732        let provider = mock_provider(vec![]);
3733        let channel = MockChannel::new(vec![]);
3734        let registry = create_test_registry();
3735        let executor = MockToolExecutor::no_tools();
3736
3737        let (tx, rx) = tokio::sync::watch::channel(MetricsSnapshot::default());
3738        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
3739        let cid = memory.sqlite().create_conversation().await.unwrap();
3740
3741        let mut agent = Agent::new(provider, channel, registry, None, 5, executor)
3742            .with_metrics(tx)
3743            .with_memory(std::sync::Arc::new(memory), cid, 50, 5, 100);
3744        agent.memory_state.persistence.autosave_assistant = true;
3745        agent.memory_state.persistence.autosave_min_length = 0;
3746
3747        let parts = vec![MessagePart::ToolResult {
3748            tool_use_id: "tu2".into(),
3749            content: "[stopped] execution limit reached".into(),
3750            is_error: false,
3751        }];
3752
3753        agent
3754            .persist_message(
3755                Role::User,
3756                "[stopped] execution limit reached",
3757                &parts,
3758                false,
3759            )
3760            .await;
3761
3762        assert_eq!(
3763            rx.borrow().embeddings_generated,
3764            0,
3765            "[stopped] ToolResult must not be embedded into Qdrant"
3766        );
3767    }
3768
3769    #[tokio::test]
3770    async fn persist_message_normal_tool_result_is_saved_not_blocked_by_guard() {
3771        // Regression: a normal ToolResult (no [skipped]/[stopped] prefix) must not be
3772        // suppressed by the utility-gate guard and must reach the save path (SQLite).
3773        use zeph_llm::provider::MessagePart;
3774
3775        let provider = mock_provider(vec![]);
3776        let channel = MockChannel::new(vec![]);
3777        let registry = create_test_registry();
3778        let executor = MockToolExecutor::no_tools();
3779
3780        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
3781        let cid = memory.sqlite().create_conversation().await.unwrap();
3782        let memory_arc = std::sync::Arc::new(memory);
3783
3784        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
3785            memory_arc.clone(),
3786            cid,
3787            50,
3788            5,
3789            100,
3790        );
3791        agent.memory_state.persistence.autosave_assistant = true;
3792        agent.memory_state.persistence.autosave_min_length = 0;
3793
3794        let content = "total 42\ndrwxr-xr-x  5 user group";
3795        let parts = vec![MessagePart::ToolResult {
3796            tool_use_id: "tu3".into(),
3797            content: content.into(),
3798            is_error: false,
3799        }];
3800
3801        agent
3802            .persist_message(Role::User, content, &parts, false)
3803            .await;
3804
3805        // Must be saved to SQLite — confirms the guard did not block this path.
3806        let history = memory_arc.sqlite().load_history(cid, 50).await.unwrap();
3807        assert_eq!(
3808            history.len(),
3809            1,
3810            "normal ToolResult must be saved to SQLite"
3811        );
3812        assert_eq!(history[0].content, content);
3813    }
3814
3815    /// Verify that `enqueue_trajectory_extraction_task` uses a bounded tail slice instead of
3816    /// cloning the full message vec. We confirm the slice logic by checking that the
3817    /// `tail_start` calculation correctly bounds the window even with more messages than
3818    /// `max_messages`.
3819    #[test]
3820    fn trajectory_extraction_slice_bounds_messages() {
3821        // Replicate the slice logic from enqueue_trajectory_extraction_task.
3822        let max_messages: usize = 20;
3823        let total_messages = 100usize;
3824
3825        let tail_start = total_messages.saturating_sub(max_messages);
3826        let window = total_messages - tail_start;
3827
3828        assert_eq!(
3829            window, 20,
3830            "slice should contain exactly max_messages items"
3831        );
3832        assert_eq!(tail_start, 80, "slice should start at len - max_messages");
3833    }
3834
3835    #[test]
3836    fn trajectory_extraction_slice_handles_few_messages() {
3837        let max_messages: usize = 20;
3838        let total_messages = 5usize;
3839
3840        let tail_start = total_messages.saturating_sub(max_messages);
3841        let window = total_messages - tail_start;
3842
3843        assert_eq!(window, 5, "should return all messages when fewer than max");
3844        assert_eq!(tail_start, 0, "slice should start from the beginning");
3845    }
3846
3847    // --- #3168 regression tests ---
3848
3849    /// Round-trip: persist `Assistant[tool_use]` + `User[tool_result]`, then `load_history`.
3850    /// After `sanitize_tool_pairs` the pair must be intact — no WARN, both messages present
3851    /// with non-empty parts.
3852    #[tokio::test]
3853    async fn regression_3168_complete_tool_pair_survives_round_trip() {
3854        use zeph_llm::provider::MessagePart;
3855
3856        let provider = mock_provider(vec![]);
3857        let channel = MockChannel::new(vec![]);
3858        let registry = create_test_registry();
3859        let executor = MockToolExecutor::no_tools();
3860
3861        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
3862        let cid = memory.sqlite().create_conversation().await.unwrap();
3863        let sqlite = memory.sqlite();
3864
3865        let use_parts = serde_json::to_string(&[MessagePart::ToolUse {
3866            id: "r3168_call".to_string(),
3867            name: "shell".to_string(),
3868            input: serde_json::json!({"command": "echo hi"}),
3869        }])
3870        .unwrap();
3871        sqlite
3872            .save_message_with_parts(
3873                cid,
3874                "assistant",
3875                "[tool_use: shell(r3168_call)]",
3876                &use_parts,
3877            )
3878            .await
3879            .unwrap();
3880
3881        let result_parts = serde_json::to_string(&[MessagePart::ToolResult {
3882            tool_use_id: "r3168_call".to_string(),
3883            content: "[skipped]".to_string(),
3884            is_error: false,
3885        }])
3886        .unwrap();
3887        sqlite
3888            .save_message_with_parts(cid, "user", "[tool_result: r3168_call]", &result_parts)
3889            .await
3890            .unwrap();
3891
3892        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
3893            std::sync::Arc::new(memory),
3894            cid,
3895            50,
3896            5,
3897            100,
3898        );
3899
3900        let base = agent.msg.messages.len();
3901        agent.load_history().await.unwrap();
3902
3903        assert_eq!(
3904            agent.msg.messages.len(),
3905            base + 2,
3906            "both messages of the complete pair must survive load_history"
3907        );
3908
3909        let assistant_msg = agent
3910            .msg
3911            .messages
3912            .iter()
3913            .find(|m| m.role == Role::Assistant)
3914            .expect("assistant message missing after load_history");
3915        assert!(
3916            assistant_msg
3917                .parts
3918                .iter()
3919                .any(|p| matches!(p, MessagePart::ToolUse { id, .. } if id == "r3168_call")),
3920            "ToolUse part must be preserved in assistant message"
3921        );
3922
3923        let user_msg = agent
3924            .msg
3925            .messages
3926            .iter()
3927            .rev()
3928            .find(|m| m.role == Role::User)
3929            .expect("user message missing after load_history");
3930        assert!(
3931            user_msg.parts.iter().any(|p| matches!(p, MessagePart::ToolResult { tool_use_id, .. } if tool_use_id == "r3168_call")),
3932            "ToolResult part must be preserved in user message"
3933        );
3934    }
3935
3936    /// A System message between an `Assistant[tool_use]` and the matching `User[tool_result]`
3937    /// must not cause the `tool_use` to be treated as an orphan.
3938    #[test]
3939    fn regression_3168_system_between_tool_pair_not_stripped() {
3940        use zeph_llm::provider::{MessageMetadata, MessagePart};
3941
3942        let tool_id = "call_sys_between".to_string();
3943
3944        let mut messages = vec![
3945            Message {
3946                role: Role::Assistant,
3947                content: "[tool_use: shell(call_sys_between)]".to_string(),
3948                parts: vec![MessagePart::ToolUse {
3949                    id: tool_id.clone(),
3950                    name: "shell".to_string(),
3951                    input: serde_json::json!({"command": "ls"}),
3952                }],
3953                metadata: MessageMetadata::default(),
3954            },
3955            Message {
3956                role: Role::System,
3957                content: "system hint injected between tool calls".to_string(),
3958                parts: vec![],
3959                metadata: MessageMetadata::default(),
3960            },
3961            Message {
3962                role: Role::User,
3963                content: "[tool_result: call_sys_between]".to_string(),
3964                parts: vec![MessagePart::ToolResult {
3965                    tool_use_id: tool_id.clone(),
3966                    content: "output".to_string(),
3967                    is_error: false,
3968                }],
3969                metadata: MessageMetadata::default(),
3970            },
3971        ];
3972
3973        let (removed, _db_ids) = strip_mid_history_orphans(&mut messages);
3974
3975        assert_eq!(
3976            removed, 0,
3977            "no messages should be removed when System sits between a matched tool_use/tool_result pair"
3978        );
3979        assert_eq!(messages.len(), 3, "all three messages must remain");
3980        assert!(
3981            messages[0]
3982                .parts
3983                .iter()
3984                .any(|p| matches!(p, MessagePart::ToolUse { id, .. } if id == &tool_id)),
3985            "ToolUse part must not be stripped from assistant message"
3986        );
3987    }
3988
3989    /// If parts serialization fails, `persist_message` must return early and not store
3990    /// a row with empty parts that would create an orphaned `tool_use` on next session load.
3991    /// We verify this by writing a row with invalid `parts_json` directly and confirming
3992    /// `load_history` skips it (empty parts row is not injected into the agent).
3993    #[tokio::test]
3994    async fn regression_3168_corrupt_parts_row_skipped_on_load() {
3995        use zeph_llm::provider::MessagePart;
3996
3997        let provider = mock_provider(vec![]);
3998        let channel = MockChannel::new(vec![]);
3999        let registry = create_test_registry();
4000        let executor = MockToolExecutor::no_tools();
4001
4002        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
4003        let cid = memory.sqlite().create_conversation().await.unwrap();
4004        let sqlite = memory.sqlite();
4005
4006        // Simulate the pre-fix bug: assistant message stored with empty parts_json "[]"
4007        // even though it should have had a ToolUse part. This is what persist_message used
4008        // to do before the early-return fix.
4009        sqlite
4010            .save_message_with_parts(cid, "assistant", "[tool_use: shell(corrupt)]", "[]")
4011            .await
4012            .unwrap();
4013
4014        // User message with the matching tool_result stored correctly.
4015        let result_parts = serde_json::to_string(&[MessagePart::ToolResult {
4016            tool_use_id: "corrupt".to_string(),
4017            content: "result".to_string(),
4018            is_error: false,
4019        }])
4020        .unwrap();
4021        sqlite
4022            .save_message_with_parts(cid, "user", "[tool_result: corrupt]", &result_parts)
4023            .await
4024            .unwrap();
4025
4026        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
4027            std::sync::Arc::new(memory),
4028            cid,
4029            50,
4030            5,
4031            100,
4032        );
4033
4034        let base = agent.msg.messages.len();
4035        agent.load_history().await.unwrap();
4036
4037        // The user ToolResult has no matching ToolUse in the assistant message (parts="[]"),
4038        // so sanitize_tool_pairs must strip the orphaned ToolResult.
4039        // Net result: only the content-only assistant message survives; user msg is removed
4040        // because after stripping its only part it becomes empty.
4041        let loaded = agent.msg.messages.len() - base;
4042        // No message injected with orphaned ToolResult parts — either stripped entirely or
4043        // the ToolResult part removed. Verify no user message with ToolResult remains.
4044        let orphan_present = agent.msg.messages.iter().any(|m| {
4045            m.role == Role::User
4046                && m.parts.iter().any(|p| {
4047                    matches!(p, MessagePart::ToolResult { tool_use_id, .. } if tool_use_id == "corrupt")
4048                })
4049        });
4050        assert!(
4051            !orphan_present,
4052            "orphaned ToolResult must not survive load_history; loaded={loaded}"
4053        );
4054    }
4055}