Skip to main content

zeph_core/agent/
persistence.rs

1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4use crate::channel::Channel;
5use zeph_agent_persistence::graph::{build_graph_extraction_config, collect_context_messages};
6use zeph_agent_persistence::{
7    LoadHistoryParams, MemoryPersistenceView, MetricsView, PersistMessageRequest,
8    PersistenceService, SecurityView,
9};
10use zeph_llm::provider::{LlmProvider as _, MessagePart, Role};
11
12use super::Agent;
13
14impl<C: Channel> Agent<C> {
15    /// Load conversation history from memory and inject into messages.
16    ///
17    /// Delegates to [`PersistenceService::load_history`]. Post-load operations that touch
18    /// agent-internal singletons (session count increment, semantic fact count recompute,
19    /// token recompute) remain in this shim because they access fields outside the
20    /// borrow-lens view.
21    ///
22    /// # Errors
23    ///
24    /// Returns an error if loading history from `SQLite` fails.
25    ///
26    /// # Panics
27    ///
28    /// Does not panic. The internal `unwrap_or(0)` conversions are on fallible `i64 → usize`
29    /// casts that saturate to zero on overflow; they cannot panic.
30    #[tracing::instrument(name = "core.persist.load_history", skip_all, level = "debug", err)]
31    pub async fn load_history(&mut self) -> Result<(), super::error::AgentError> {
32        let (Some(memory), Some(cid)) = (
33            self.services.memory.persistence.memory.as_ref(),
34            self.services.memory.persistence.conversation_id,
35        ) else {
36            return Ok(());
37        };
38
39        // Clone so we can call methods after the borrow-lens view is dropped.
40        let memory = memory.clone();
41
42        let mut unsummarized = self.services.memory.persistence.unsummarized_count;
43        // `memory_view` is not `mut` — the `&mut unsummarized` inside is established at
44        // construction and passed as `&memory_view` to load_history (shared borrow).
45        let memory_view = MemoryPersistenceView {
46            memory: Some(&memory),
47            conversation_id: self.services.memory.persistence.conversation_id,
48            autosave_assistant: self.services.memory.persistence.autosave_assistant,
49            autosave_min_length: self.services.memory.persistence.autosave_min_length,
50            unsummarized_count: &mut unsummarized,
51            goal_text: self.services.memory.extraction.goal_text.clone(),
52        };
53
54        let svc = PersistenceService::new();
55        let outcome = svc
56            .load_history(LoadHistoryParams {
57                messages: &mut self.msg.messages,
58                last_persisted_message_id: &mut self.msg.last_persisted_message_id,
59                deferred_hide_ids: &mut self.msg.deferred_db_hide_ids,
60                memory_view: &memory_view,
61            })
62            .await
63            .map_err(|e| {
64                super::error::AgentError::Memory(zeph_memory::MemoryError::Other(e.to_string()))
65            })?;
66
67        // Write back lens-borrowed local to the field.
68        self.services.memory.persistence.unsummarized_count = unsummarized;
69
70        if outcome.messages_loaded > 0 {
71            // Increment session counts so tier promotion can track cross-session access.
72            let _ = memory
73                .sqlite()
74                .increment_session_counts_for_conversation(cid)
75                .await
76                .inspect_err(|e| {
77                    tracing::warn!(error = %e, "failed to increment tier session counts");
78                });
79        }
80
81        // Set absolute SQLite message count and semantic fact count (not deltas).
82        self.update_metrics(|m| {
83            m.sqlite_message_count = outcome.sqlite_total_messages;
84        });
85        if let Ok(count) = memory.sqlite().count_semantic_facts().await {
86            let count_u64 = u64::try_from(count).unwrap_or(0);
87            self.update_metrics(|m| {
88                m.semantic_fact_count = count_u64;
89            });
90        }
91        if let Ok(count) = memory.unsummarized_message_count(cid).await {
92            self.services.memory.persistence.unsummarized_count =
93                usize::try_from(count).unwrap_or(0);
94        }
95
96        self.recompute_prompt_tokens();
97        Ok(())
98    }
99
100    /// Persist a message to memory.
101    ///
102    /// `has_injection_flags` controls whether Qdrant embedding is skipped for this message.
103    /// When `true` and `guard_memory_writes` is enabled, only `SQLite` is written — the message
104    /// is saved for conversation continuity but will not pollute semantic search (M2, D2).
105    #[tracing::instrument(name = "core.persist.persist_message", skip_all, level = "debug")]
106    pub(crate) async fn persist_message(
107        &mut self,
108        role: Role,
109        content: &str,
110        parts: &[MessagePart],
111        has_injection_flags: bool,
112    ) {
113        // M2: call should_guard_memory_write for its diagnostic side effects (tracing + security
114        // event). The bool result is passed into SecurityView so the service can decide whether
115        // to skip Qdrant embedding.
116        let guard_event = self
117            .services
118            .security
119            .exfiltration_guard
120            .should_guard_memory_write(has_injection_flags);
121        if let Some(ref event) = guard_event {
122            tracing::warn!(
123                ?event,
124                "exfiltration guard: skipping Qdrant embedding for flagged content"
125            );
126            self.push_security_event(
127                zeph_common::SecurityEventCategory::ExfiltrationBlock,
128                "memory_write",
129                "Qdrant embedding skipped: flagged content",
130            );
131        }
132
133        let req = PersistMessageRequest::from_borrowed(role, content, parts, has_injection_flags);
134
135        let mut unsummarized = self.services.memory.persistence.unsummarized_count;
136        let memory_arc = self.services.memory.persistence.memory.clone();
137        let mut memory_view = MemoryPersistenceView {
138            memory: memory_arc.as_ref(),
139            conversation_id: self.services.memory.persistence.conversation_id,
140            autosave_assistant: self.services.memory.persistence.autosave_assistant,
141            autosave_min_length: self.services.memory.persistence.autosave_min_length,
142            unsummarized_count: &mut unsummarized,
143            goal_text: self.services.memory.extraction.goal_text.clone(),
144        };
145        let security = SecurityView {
146            guard_memory_writes: guard_event.is_some(),
147            _phantom: std::marker::PhantomData,
148        };
149        let mut sqlite_delta = 0u64;
150        let mut embed_delta = 0u64;
151        let mut guard_delta = 0u64;
152        let mut metrics_view = MetricsView {
153            sqlite_message_count: &mut sqlite_delta,
154            embeddings_generated: &mut embed_delta,
155            exfiltration_memory_guards: &mut guard_delta,
156        };
157
158        let svc = PersistenceService::new();
159        let outcome = svc
160            .persist_message(
161                req,
162                &mut self.msg.last_persisted_message_id,
163                &mut memory_view,
164                &security,
165                &mut metrics_view,
166            )
167            .await;
168
169        // Write back the unsummarized counter (lens borrowed a local copy).
170        self.services.memory.persistence.unsummarized_count = unsummarized;
171
172        // Forward metric deltas through the watch broadcast.
173        self.update_metrics(|m| {
174            m.sqlite_message_count += sqlite_delta;
175            m.embeddings_generated += embed_delta;
176            // guard_delta is already tracked via push_security_event above.
177            m.exfiltration_memory_guards += guard_delta;
178        });
179
180        if outcome.message_id.is_none() {
181            return;
182        }
183
184        // Phase 2: enqueue enrichment tasks via supervisor (non-blocking).
185        // check_summarization signals completion via SummarizationSignal, consumed in reap()
186        // between turns — no shared mutable state across tasks (S1 fix).
187        self.enqueue_summarization_task();
188
189        // FIX-1: skip graph extraction for tool result messages — they contain raw structured
190        // output (TOML, JSON, code) that pollutes the entity graph with noise.
191        let has_tool_result_parts = parts
192            .iter()
193            .any(|p| matches!(p, MessagePart::ToolResult { .. }));
194
195        self.enqueue_graph_extraction_task(content, has_injection_flags, has_tool_result_parts)
196            .await;
197
198        // Persona extraction: run only for user messages that are not tool results and not injected.
199        if role == Role::User && !has_tool_result_parts && !has_injection_flags {
200            self.enqueue_persona_extraction_task();
201        }
202
203        // Trajectory extraction: run after turns that contained tool results.
204        if has_tool_result_parts {
205            self.enqueue_trajectory_extraction_task();
206        }
207
208        // ReasoningBank distillation: runs only after the final assistant message of a turn
209        // (C2 fix: skip intermediate tool-call messages). A message with ToolUse parts is an
210        // intermediate step; the final assistant message has no ToolUse parts.
211        // S-Med1: skip if injection patterns detected — mirrors graph extraction guard.
212        let has_tool_use_parts = parts
213            .iter()
214            .any(|p| matches!(p, MessagePart::ToolUse { .. }));
215        if role == Role::Assistant && !has_tool_use_parts && !has_injection_flags {
216            self.enqueue_reasoning_extraction_task();
217            // MemCoT distillation: same guards as ReasoningBank.
218            self.enqueue_memcot_distill_task(content);
219        }
220    }
221
222    /// Enqueue `MemCoT` semantic state distillation via the supervisor.
223    ///
224    /// All cost gates (interval, session cap, min chars) are checked inside
225    /// [`crate::agent::memcot::SemanticStateAccumulator::maybe_enqueue_distill`].
226    fn enqueue_memcot_distill_task(&mut self, assistant_content: &str) {
227        let Some(accumulator) = &self.services.memory.extraction.memcot_accumulator else {
228            return;
229        };
230        let distill_provider_name = self
231            .services
232            .memory
233            .extraction
234            .memcot_config
235            .distill_provider
236            .as_str();
237        let provider = self.resolve_background_provider(distill_provider_name);
238
239        let content = assistant_content.to_owned();
240        let supervisor = &mut self.runtime.lifecycle.supervisor;
241
242        accumulator.maybe_enqueue_distill(&content, provider, |name, fut| {
243            supervisor.spawn(super::agent_supervisor::TaskClass::Enrichment, name, fut);
244        });
245    }
246
247    /// Enqueue background summarization via the supervisor (S1 fix: no shared `AtomicUsize`).
248    fn enqueue_summarization_task(&mut self) {
249        let (Some(memory), Some(cid)) = (
250            self.services.memory.persistence.memory.clone(),
251            self.services.memory.persistence.conversation_id,
252        ) else {
253            return;
254        };
255
256        if self.services.memory.persistence.unsummarized_count
257            <= self.services.memory.compaction.summarization_threshold
258        {
259            return;
260        }
261
262        let batch_size = self.services.memory.compaction.summarization_threshold / 2;
263
264        self.runtime.lifecycle.supervisor.spawn_summarization("summarization", async move {
265            match tokio::time::timeout(
266                std::time::Duration::from_secs(30),
267                memory.summarize(cid, batch_size),
268            )
269            .await
270            {
271                Ok(Ok(Some(summary_id))) => {
272                    tracing::info!(
273                        "background summarization: created summary {summary_id} for conversation {cid}"
274                    );
275                    true
276                }
277                Ok(Ok(None)) => {
278                    tracing::debug!("background summarization: no summarization needed");
279                    false
280                }
281                Ok(Err(e)) => {
282                    tracing::error!("background summarization failed: {e:#}");
283                    false
284                }
285                Err(_) => {
286                    tracing::warn!("background summarization timed out after 30s");
287                    false
288                }
289            }
290        });
291    }
292
293    /// Prepare graph extraction guards in foreground, then enqueue heavy work via supervisor.
294    ///
295    /// Guards (enabled check, injection/tool-result skip) stay on the foreground path.
296    /// The RPE check and actual extraction run in background (S2: no `send_status`).
297    #[tracing::instrument(
298        name = "core.persist.enqueue_graph_extraction",
299        skip_all,
300        level = "debug"
301    )]
302    async fn enqueue_graph_extraction_task(
303        &mut self,
304        content: &str,
305        has_injection_flags: bool,
306        has_tool_result_parts: bool,
307    ) {
308        if self.services.memory.persistence.memory.is_none()
309            || self.services.memory.persistence.conversation_id.is_none()
310        {
311            return;
312        }
313        if has_tool_result_parts {
314            tracing::debug!("graph extraction skipped: message contains ToolResult parts");
315            return;
316        }
317        if has_injection_flags {
318            tracing::warn!("graph extraction skipped: injection patterns detected in content");
319            return;
320        }
321
322        let cfg = &self.services.memory.extraction.graph_config;
323        if !cfg.enabled {
324            return;
325        }
326        let embed_timeout_secs = self
327            .services
328            .memory
329            .persistence
330            .memory
331            .as_ref()
332            .map_or(5, |m| m.embed_timeout().as_secs());
333        let extraction_cfg = build_graph_extraction_config(
334            cfg,
335            self.services
336                .memory
337                .persistence
338                .conversation_id
339                .map(|c| c.0),
340            embed_timeout_secs,
341        );
342        // Resolve a clean provider that bypasses quality_gate for JSON extraction tasks.
343        // When extract_provider is empty, falls back to the primary provider (existing behavior).
344        let extract_provider_name = cfg.extract_provider.as_str().to_owned();
345
346        // RPE check: embed + compute surprise score. Stays on foreground to avoid
347        // capturing the rpe_router mutex in a background task.
348        if self.rpe_should_skip(content).await {
349            tracing::debug!("D-MEM RPE: low-surprise turn, skipping graph extraction");
350            return;
351        }
352
353        let context_messages = collect_context_messages(&self.msg.messages);
354
355        let Some(memory) = self.services.memory.persistence.memory.clone() else {
356            return;
357        };
358
359        let validator: zeph_memory::semantic::PostExtractValidator =
360            if self.services.security.memory_validator.is_enabled() {
361                let v = self.services.security.memory_validator.clone();
362                Some(Box::new(move |result| {
363                    v.validate_graph_extraction(result)
364                        .map_err(|e| e.to_string())
365                }))
366            } else {
367                None
368            };
369
370        let provider_override = if extract_provider_name.is_empty() {
371            None
372        } else {
373            Some(self.resolve_background_provider(&extract_provider_name))
374        };
375
376        self.spawn_graph_extraction_task(
377            memory,
378            content,
379            context_messages,
380            extraction_cfg,
381            validator,
382            provider_override,
383        );
384
385        // Sync community failures and extraction metrics (cheap, foreground-safe).
386        self.sync_community_detection_failures();
387        self.sync_graph_extraction_metrics();
388        self.enqueue_graph_count_sync_task();
389    }
390
391    fn spawn_graph_extraction_task(
392        &mut self,
393        memory: std::sync::Arc<zeph_memory::semantic::SemanticMemory>,
394        content: &str,
395        context_messages: Vec<String>,
396        extraction_cfg: zeph_memory::semantic::GraphExtractionConfig,
397        validator: zeph_memory::semantic::PostExtractValidator,
398        provider_override: Option<zeph_llm::any::AnyProvider>,
399    ) {
400        let content_owned = content.to_owned();
401        let graph_store = memory.graph_store.clone();
402        let metrics_tx = self.runtime.metrics.metrics_tx.clone();
403        let start_time = self.runtime.lifecycle.start_time;
404        let cancel = self.runtime.lifecycle.cancel_token.child_token();
405
406        self.runtime.lifecycle.supervisor.spawn(
407            super::agent_supervisor::TaskClass::Enrichment,
408            "graph_extraction",
409            async move {
410                let extraction_handle = memory.spawn_graph_extraction(
411                    content_owned,
412                    context_messages,
413                    extraction_cfg,
414                    validator,
415                    provider_override,
416                    cancel,
417                );
418
419                // After extraction completes, refresh graph count metrics.
420                if let (Some(store), Some(tx)) = (graph_store, metrics_tx) {
421                    let _ = extraction_handle.await;
422                    let (entities, edges, communities) = tokio::join!(
423                        store.entity_count(),
424                        store.active_edge_count(),
425                        store.community_count()
426                    );
427                    let elapsed = start_time.elapsed().as_secs();
428                    tx.send_modify(|m| {
429                        m.uptime_seconds = elapsed;
430                        m.graph_entities_total = entities.unwrap_or(0).cast_unsigned();
431                        m.graph_edges_total = edges.unwrap_or(0).cast_unsigned();
432                        m.graph_communities_total = communities.unwrap_or(0).cast_unsigned();
433                    });
434                } else {
435                    let _ = extraction_handle.await;
436                }
437
438                tracing::debug!("background graph extraction complete");
439            },
440        );
441    }
442
443    // sync_graph_counts and sync_guidelines_status are DB reads; enqueued as Telemetry background.
444    fn enqueue_graph_count_sync_task(&mut self) {
445        let memory_for_sync = self.services.memory.persistence.memory.clone();
446        let metrics_tx_sync = self.runtime.metrics.metrics_tx.clone();
447        let start_time_sync = self.runtime.lifecycle.start_time;
448        let cid_sync = self.services.memory.persistence.conversation_id;
449        let graph_store_sync = memory_for_sync.as_ref().and_then(|m| m.graph_store.clone());
450        let sqlite_sync = memory_for_sync.as_ref().map(|m| m.sqlite().clone());
451        let guidelines_enabled = self.services.memory.extraction.graph_config.enabled;
452
453        self.runtime.lifecycle.supervisor.spawn(
454            super::agent_supervisor::TaskClass::Telemetry,
455            "graph_count_sync",
456            async move {
457                let Some(store) = graph_store_sync else {
458                    return;
459                };
460                let Some(tx) = metrics_tx_sync else { return };
461
462                let (entities, edges, communities) = tokio::join!(
463                    store.entity_count(),
464                    store.active_edge_count(),
465                    store.community_count()
466                );
467                let elapsed = start_time_sync.elapsed().as_secs();
468                tx.send_modify(|m| {
469                    m.uptime_seconds = elapsed;
470                    m.graph_entities_total = entities.unwrap_or(0).cast_unsigned();
471                    m.graph_edges_total = edges.unwrap_or(0).cast_unsigned();
472                    m.graph_communities_total = communities.unwrap_or(0).cast_unsigned();
473                });
474
475                // Sync guidelines status.
476                if guidelines_enabled && let Some(sqlite) = sqlite_sync {
477                    match tokio::time::timeout(
478                        std::time::Duration::from_secs(10),
479                        sqlite.load_compression_guidelines_meta(cid_sync),
480                    )
481                    .await
482                    {
483                        Ok(Ok((version, created_at))) => {
484                            #[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
485                            let version_u32 = u32::try_from(version).unwrap_or(0);
486                            tx.send_modify(|m| {
487                                m.guidelines_version = version_u32;
488                                m.guidelines_updated_at = created_at;
489                            });
490                        }
491                        Ok(Err(e)) => {
492                            tracing::debug!("guidelines status sync failed: {e:#}");
493                        }
494                        Err(_) => {
495                            tracing::debug!("guidelines status sync timed out");
496                        }
497                    }
498                }
499            },
500        );
501    }
502
503    /// Enqueue persona extraction via supervisor (background, no `send_status`).
504    fn enqueue_persona_extraction_task(&mut self) {
505        use zeph_memory::semantic::{PersonaExtractionConfig, extract_persona_facts};
506
507        let cfg = &self.services.memory.extraction.persona_config;
508        if !cfg.enabled {
509            return;
510        }
511
512        let Some(memory) = &self.services.memory.persistence.memory else {
513            return;
514        };
515
516        let user_messages: Vec<String> = self
517            .msg
518            .messages
519            .iter()
520            .filter(|m| {
521                m.role == Role::User
522                    && !m
523                        .parts
524                        .iter()
525                        .any(|p| matches!(p, MessagePart::ToolResult { .. }))
526            })
527            .take(8)
528            .map(|m| {
529                if m.content.len() > 2048 {
530                    m.content[..m.content.floor_char_boundary(2048)].to_owned()
531                } else {
532                    m.content.clone()
533                }
534            })
535            .collect();
536
537        if user_messages.len() < cfg.min_messages {
538            return;
539        }
540
541        let timeout_secs = cfg.extraction_timeout_secs;
542        let extraction_cfg = PersonaExtractionConfig {
543            enabled: cfg.enabled,
544            min_messages: cfg.min_messages,
545            max_messages: cfg.max_messages,
546            extraction_timeout_secs: timeout_secs,
547        };
548
549        let provider = self.resolve_background_provider(cfg.persona_provider.as_str());
550        let store = memory.sqlite().clone();
551        let conversation_id = self
552            .services
553            .memory
554            .persistence
555            .conversation_id
556            .map(|c| c.0);
557
558        self.runtime.lifecycle.supervisor.spawn(
559            super::agent_supervisor::TaskClass::Enrichment,
560            "persona_extraction",
561            async move {
562                let user_message_refs: Vec<&str> =
563                    user_messages.iter().map(String::as_str).collect();
564                let fut = extract_persona_facts(
565                    &store,
566                    &provider,
567                    &user_message_refs,
568                    &extraction_cfg,
569                    conversation_id,
570                );
571                match tokio::time::timeout(std::time::Duration::from_secs(timeout_secs), fut).await
572                {
573                    Ok(Ok(n)) => tracing::debug!(upserted = n, "persona extraction complete"),
574                    Ok(Err(e)) => tracing::warn!(error = %e, "persona extraction failed"),
575                    Err(_) => tracing::warn!(
576                        timeout_secs,
577                        "persona extraction timed out — no facts written this turn"
578                    ),
579                }
580            },
581        );
582    }
583
584    /// Enqueue trajectory extraction via supervisor (background).
585    fn enqueue_trajectory_extraction_task(&mut self) {
586        use zeph_memory::semantic::{TrajectoryExtractionConfig, extract_trajectory_entries};
587
588        let cfg = self.services.memory.extraction.trajectory_config.clone();
589        if !cfg.enabled {
590            return;
591        }
592
593        let Some(memory) = &self.services.memory.persistence.memory else {
594            return;
595        };
596
597        let conversation_id = match self.services.memory.persistence.conversation_id {
598            Some(cid) => cid.0,
599            None => return,
600        };
601
602        let tail_start = self.msg.messages.len().saturating_sub(cfg.max_messages);
603        let turn_messages: Vec<zeph_llm::provider::Message> =
604            self.msg.messages[tail_start..].to_vec();
605
606        if turn_messages.is_empty() {
607            return;
608        }
609
610        let extraction_cfg = TrajectoryExtractionConfig {
611            enabled: cfg.enabled,
612            max_messages: cfg.max_messages,
613            extraction_timeout_secs: cfg.extraction_timeout_secs,
614        };
615
616        let provider = self.resolve_background_provider(cfg.trajectory_provider.as_str());
617        let store = memory.sqlite().clone();
618        let min_confidence = cfg.min_confidence;
619
620        self.runtime.lifecycle.supervisor.spawn(
621            super::agent_supervisor::TaskClass::Enrichment,
622            "trajectory_extraction",
623            async move {
624                let entries =
625                    match extract_trajectory_entries(&provider, &turn_messages, &extraction_cfg)
626                        .await
627                    {
628                        Ok(e) => e,
629                        Err(e) => {
630                            tracing::warn!(error = %e, "trajectory extraction failed");
631                            return;
632                        }
633                    };
634
635                let last_id = store
636                    .trajectory_last_extracted_message_id(conversation_id)
637                    .await
638                    .unwrap_or(0);
639
640                let mut max_id = last_id;
641                for entry in &entries {
642                    if entry.confidence < min_confidence {
643                        continue;
644                    }
645                    let tools_json = serde_json::to_string(&entry.tools_used)
646                        .unwrap_or_else(|_| "[]".to_string());
647                    match store
648                        .insert_trajectory_entry(zeph_memory::NewTrajectoryEntry {
649                            conversation_id: Some(conversation_id),
650                            turn_index: 0,
651                            kind: &entry.kind,
652                            intent: &entry.intent,
653                            outcome: &entry.outcome,
654                            tools_used: &tools_json,
655                            confidence: entry.confidence,
656                        })
657                        .await
658                    {
659                        Ok(id) => {
660                            if id > max_id {
661                                max_id = id;
662                            }
663                        }
664                        Err(e) => tracing::warn!(error = %e, "failed to insert trajectory entry"),
665                    }
666                }
667
668                if max_id > last_id {
669                    let _ = store
670                        .set_trajectory_last_extracted_message_id(conversation_id, max_id)
671                        .await;
672                }
673
674                tracing::debug!(
675                    count = entries.len(),
676                    conversation_id,
677                    "trajectory extraction complete"
678                );
679            },
680        );
681    }
682
683    /// Enqueue reasoning strategy distillation via supervisor (background, fire-and-forget).
684    ///
685    /// Mirrors [`Self::enqueue_trajectory_extraction_task`]. Runs after every assistant turn
686    /// when `memory.reasoning.enabled = true` and a `ReasoningMemory` is attached.
687    fn enqueue_reasoning_extraction_task(&mut self) {
688        let cfg = self.services.memory.extraction.reasoning_config.clone();
689        if !cfg.enabled {
690            return;
691        }
692
693        let Some(memory) = &self.services.memory.persistence.memory else {
694            return;
695        };
696
697        let Some(reasoning) = memory.reasoning.clone() else {
698            return;
699        };
700
701        let tail_start = self.msg.messages.len().saturating_sub(cfg.max_messages);
702        let turn_messages: Vec<zeph_llm::provider::Message> =
703            self.msg.messages[tail_start..].to_vec();
704
705        if turn_messages.len() < cfg.min_messages {
706            return;
707        }
708
709        let extract_provider = self.resolve_background_provider(cfg.extract_provider.as_str());
710        let distill_provider = self.resolve_background_provider(cfg.distill_provider.as_str());
711        let embed_provider = memory.effective_embed_provider().clone();
712        let store_limit = cfg.store_limit;
713        let extraction_timeout = std::time::Duration::from_secs(cfg.extraction_timeout_secs);
714        let distill_timeout = std::time::Duration::from_secs(cfg.distill_timeout_secs);
715        let embed_timeout = memory.embed_timeout();
716        let self_judge_window = cfg.self_judge_window;
717        let min_assistant_chars = cfg.min_assistant_chars;
718
719        self.runtime.lifecycle.supervisor.spawn(
720            super::agent_supervisor::TaskClass::Enrichment,
721            "reasoning_extraction",
722            async move {
723                if let Err(e) = zeph_memory::process_reasoning_turn(
724                    &reasoning,
725                    &extract_provider,
726                    &distill_provider,
727                    &embed_provider,
728                    &turn_messages,
729                    zeph_memory::ProcessTurnConfig {
730                        store_limit,
731                        extraction_timeout,
732                        distill_timeout,
733                        embed_timeout,
734                        self_judge_window,
735                        min_assistant_chars,
736                    },
737                )
738                .await
739                {
740                    tracing::warn!(error = %e, "reasoning: process_turn failed");
741                }
742
743                tracing::debug!("reasoning extraction complete");
744            },
745        );
746    }
747
748    /// D-MEM RPE check: returns `true` when the current turn should skip graph extraction.
749    ///
750    /// Embeds `content`, computes RPE via the router, and updates the router state.
751    /// Returns `false` (do not skip) on any error — conservative fallback.
752    async fn rpe_should_skip(&mut self, content: &str) -> bool {
753        let Some(ref rpe_mutex) = self.services.memory.extraction.rpe_router else {
754            return false;
755        };
756        let Some(memory) = &self.services.memory.persistence.memory else {
757            return false;
758        };
759        let candidates = zeph_memory::extract_candidate_entities(content);
760        let provider = memory.provider();
761        let Ok(Ok(emb_vec)) =
762            tokio::time::timeout(std::time::Duration::from_secs(5), provider.embed(content)).await
763        else {
764            return false; // embed failed/timed out → extract
765        };
766        if let Ok(mut router) = rpe_mutex.lock() {
767            let signal = router.compute(&emb_vec, &candidates);
768            router.push_embedding(emb_vec);
769            router.push_entities(&candidates);
770            !signal.should_extract
771        } else {
772            tracing::warn!("rpe_router mutex poisoned; falling through to extract");
773            false
774        }
775    }
776}
777
778#[cfg(test)]
779mod tests {
780    use super::super::agent_tests::{
781        MetricsSnapshot, MockChannel, MockToolExecutor, create_test_registry, mock_provider,
782    };
783    use super::*;
784    use zeph_llm::any::AnyProvider;
785    use zeph_llm::provider::Message;
786    use zeph_memory::semantic::SemanticMemory;
787
788    async fn test_memory(provider: &AnyProvider) -> SemanticMemory {
789        SemanticMemory::new(
790            ":memory:",
791            "http://127.0.0.1:1",
792            None,
793            provider.clone(),
794            "test-model",
795        )
796        .await
797        .unwrap()
798    }
799
800    #[tokio::test]
801    async fn load_history_without_memory_returns_ok() {
802        let provider = mock_provider(vec![]);
803        let channel = MockChannel::new(vec![]);
804        let registry = create_test_registry();
805        let executor = MockToolExecutor::no_tools();
806        let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
807
808        let result = agent.load_history().await;
809        assert!(result.is_ok());
810        // No messages added when no memory is configured
811        assert_eq!(agent.msg.messages.len(), 1); // system prompt only
812    }
813
814    #[tokio::test]
815    async fn load_history_with_messages_injects_into_agent() {
816        let provider = mock_provider(vec![]);
817        let channel = MockChannel::new(vec![]);
818        let registry = create_test_registry();
819        let executor = MockToolExecutor::no_tools();
820
821        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
822        let cid = memory.sqlite().create_conversation().await.unwrap();
823
824        memory
825            .sqlite()
826            .save_message(cid, "user", "hello from history")
827            .await
828            .unwrap();
829        memory
830            .sqlite()
831            .save_message(cid, "assistant", "hi back")
832            .await
833            .unwrap();
834
835        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
836            std::sync::Arc::new(memory),
837            cid,
838            50,
839            5,
840            100,
841        );
842
843        let messages_before = agent.msg.messages.len();
844        agent.load_history().await.unwrap();
845        // Two messages were added from history
846        assert_eq!(agent.msg.messages.len(), messages_before + 2);
847    }
848
849    #[tokio::test]
850    async fn load_history_skips_empty_messages() {
851        let provider = mock_provider(vec![]);
852        let channel = MockChannel::new(vec![]);
853        let registry = create_test_registry();
854        let executor = MockToolExecutor::no_tools();
855
856        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
857        let cid = memory.sqlite().create_conversation().await.unwrap();
858
859        // Save an empty message (should be skipped) and a valid one
860        memory
861            .sqlite()
862            .save_message(cid, "user", "   ")
863            .await
864            .unwrap();
865        memory
866            .sqlite()
867            .save_message(cid, "user", "real message")
868            .await
869            .unwrap();
870
871        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
872            std::sync::Arc::new(memory),
873            cid,
874            50,
875            5,
876            100,
877        );
878
879        let messages_before = agent.msg.messages.len();
880        agent.load_history().await.unwrap();
881        // Only the non-empty message is loaded
882        assert_eq!(agent.msg.messages.len(), messages_before + 1);
883    }
884
885    #[tokio::test]
886    async fn load_history_with_empty_store_returns_ok() {
887        let provider = mock_provider(vec![]);
888        let channel = MockChannel::new(vec![]);
889        let registry = create_test_registry();
890        let executor = MockToolExecutor::no_tools();
891
892        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
893        let cid = memory.sqlite().create_conversation().await.unwrap();
894
895        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
896            std::sync::Arc::new(memory),
897            cid,
898            50,
899            5,
900            100,
901        );
902
903        let messages_before = agent.msg.messages.len();
904        agent.load_history().await.unwrap();
905        // No messages added — empty history
906        assert_eq!(agent.msg.messages.len(), messages_before);
907    }
908
909    #[tokio::test]
910    async fn load_history_increments_session_count_for_existing_messages() {
911        let provider = mock_provider(vec![]);
912        let channel = MockChannel::new(vec![]);
913        let registry = create_test_registry();
914        let executor = MockToolExecutor::no_tools();
915
916        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
917        let cid = memory.sqlite().create_conversation().await.unwrap();
918
919        // Save two messages — they start with session_count = 0.
920        let id1 = memory
921            .sqlite()
922            .save_message(cid, "user", "hello")
923            .await
924            .unwrap();
925        let id2 = memory
926            .sqlite()
927            .save_message(cid, "assistant", "hi")
928            .await
929            .unwrap();
930
931        let memory_arc = std::sync::Arc::new(memory);
932        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
933            memory_arc.clone(),
934            cid,
935            50,
936            5,
937            100,
938        );
939
940        agent.load_history().await.unwrap();
941
942        // Both episodic messages must have session_count = 1 after restore.
943        let counts: Vec<i64> = zeph_db::query_scalar(
944            "SELECT session_count FROM messages WHERE id IN (?, ?) ORDER BY id",
945        )
946        .bind(id1)
947        .bind(id2)
948        .fetch_all(memory_arc.sqlite().pool())
949        .await
950        .unwrap();
951        assert_eq!(
952            counts,
953            vec![1, 1],
954            "session_count must be 1 after first restore"
955        );
956    }
957
958    #[tokio::test]
959    async fn load_history_does_not_increment_session_count_for_new_conversation() {
960        let provider = mock_provider(vec![]);
961        let channel = MockChannel::new(vec![]);
962        let registry = create_test_registry();
963        let executor = MockToolExecutor::no_tools();
964
965        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
966        let cid = memory.sqlite().create_conversation().await.unwrap();
967
968        // No messages saved — empty conversation.
969        let memory_arc = std::sync::Arc::new(memory);
970        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
971            memory_arc.clone(),
972            cid,
973            50,
974            5,
975            100,
976        );
977
978        agent.load_history().await.unwrap();
979
980        // No rows → no session_count increments → query returns empty.
981        let counts: Vec<i64> =
982            zeph_db::query_scalar("SELECT session_count FROM messages WHERE conversation_id = ?")
983                .bind(cid)
984                .fetch_all(memory_arc.sqlite().pool())
985                .await
986                .unwrap();
987        assert!(counts.is_empty(), "new conversation must have no messages");
988    }
989
990    #[tokio::test]
991    async fn persist_message_without_memory_silently_returns() {
992        // No memory configured — persist_message must not panic
993        let provider = mock_provider(vec![]);
994        let channel = MockChannel::new(vec![]);
995        let registry = create_test_registry();
996        let executor = MockToolExecutor::no_tools();
997        let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
998
999        // Must not panic and must complete
1000        agent.persist_message(Role::User, "hello", &[], false).await;
1001    }
1002
1003    #[tokio::test]
1004    async fn persist_message_assistant_autosave_false_uses_save_only() {
1005        let provider = mock_provider(vec![]);
1006        let channel = MockChannel::new(vec![]);
1007        let registry = create_test_registry();
1008        let executor = MockToolExecutor::no_tools();
1009
1010        let (tx, rx) = tokio::sync::watch::channel(MetricsSnapshot::default());
1011        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1012        let cid = memory.sqlite().create_conversation().await.unwrap();
1013
1014        let mut agent = Agent::new(provider, channel, registry, None, 5, executor)
1015            .with_metrics(tx)
1016            .with_memory(std::sync::Arc::new(memory), cid, 50, 5, 100);
1017        agent.services.memory.persistence.autosave_assistant = false;
1018        agent.services.memory.persistence.autosave_min_length = 20;
1019
1020        agent
1021            .persist_message(Role::Assistant, "short assistant reply", &[], false)
1022            .await;
1023
1024        let history = agent
1025            .services
1026            .memory
1027            .persistence
1028            .memory
1029            .as_ref()
1030            .unwrap()
1031            .sqlite()
1032            .load_history(cid, 50)
1033            .await
1034            .unwrap();
1035        assert_eq!(history.len(), 1, "message must be saved");
1036        assert_eq!(history[0].content, "short assistant reply");
1037        // embeddings_generated must remain 0 — save_only path does not embed
1038        assert_eq!(rx.borrow().embeddings_generated, 0);
1039    }
1040
1041    #[tokio::test]
1042    async fn persist_message_assistant_below_min_length_uses_save_only() {
1043        let provider = mock_provider(vec![]);
1044        let channel = MockChannel::new(vec![]);
1045        let registry = create_test_registry();
1046        let executor = MockToolExecutor::no_tools();
1047
1048        let (tx, rx) = tokio::sync::watch::channel(MetricsSnapshot::default());
1049        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1050        let cid = memory.sqlite().create_conversation().await.unwrap();
1051
1052        // autosave_assistant=true but min_length=1000 — short content falls back to save_only
1053        let mut agent = Agent::new(provider, channel, registry, None, 5, executor)
1054            .with_metrics(tx)
1055            .with_memory(std::sync::Arc::new(memory), cid, 50, 5, 100);
1056        agent.services.memory.persistence.autosave_assistant = true;
1057        agent.services.memory.persistence.autosave_min_length = 1000;
1058
1059        agent
1060            .persist_message(Role::Assistant, "too short", &[], false)
1061            .await;
1062
1063        let history = agent
1064            .services
1065            .memory
1066            .persistence
1067            .memory
1068            .as_ref()
1069            .unwrap()
1070            .sqlite()
1071            .load_history(cid, 50)
1072            .await
1073            .unwrap();
1074        assert_eq!(history.len(), 1, "message must be saved");
1075        assert_eq!(history[0].content, "too short");
1076        assert_eq!(rx.borrow().embeddings_generated, 0);
1077    }
1078
1079    #[tokio::test]
1080    async fn persist_message_assistant_at_min_length_boundary_uses_embed() {
1081        // content.len() == autosave_min_length → should_embed = true (>= boundary).
1082        let provider = mock_provider(vec![]);
1083        let channel = MockChannel::new(vec![]);
1084        let registry = create_test_registry();
1085        let executor = MockToolExecutor::no_tools();
1086
1087        let (tx, rx) = tokio::sync::watch::channel(MetricsSnapshot::default());
1088        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1089        let cid = memory.sqlite().create_conversation().await.unwrap();
1090
1091        let min_length = 10usize;
1092        let mut agent = Agent::new(provider, channel, registry, None, 5, executor)
1093            .with_metrics(tx)
1094            .with_memory(std::sync::Arc::new(memory), cid, 50, 5, 100);
1095        agent.services.memory.persistence.autosave_assistant = true;
1096        agent.services.memory.persistence.autosave_min_length = min_length;
1097
1098        // Exact boundary: len == min_length → embed path.
1099        let content_at_boundary = "A".repeat(min_length);
1100        assert_eq!(content_at_boundary.len(), min_length);
1101        agent
1102            .persist_message(Role::Assistant, &content_at_boundary, &[], false)
1103            .await;
1104
1105        // sqlite_message_count must be incremented regardless of embedding success.
1106        assert_eq!(rx.borrow().sqlite_message_count, 1);
1107    }
1108
1109    #[tokio::test]
1110    async fn persist_message_assistant_one_below_min_length_uses_save_only() {
1111        // content.len() == autosave_min_length - 1 → should_embed = false (below boundary).
1112        let provider = mock_provider(vec![]);
1113        let channel = MockChannel::new(vec![]);
1114        let registry = create_test_registry();
1115        let executor = MockToolExecutor::no_tools();
1116
1117        let (tx, rx) = tokio::sync::watch::channel(MetricsSnapshot::default());
1118        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1119        let cid = memory.sqlite().create_conversation().await.unwrap();
1120
1121        let min_length = 10usize;
1122        let mut agent = Agent::new(provider, channel, registry, None, 5, executor)
1123            .with_metrics(tx)
1124            .with_memory(std::sync::Arc::new(memory), cid, 50, 5, 100);
1125        agent.services.memory.persistence.autosave_assistant = true;
1126        agent.services.memory.persistence.autosave_min_length = min_length;
1127
1128        // One below boundary: len == min_length - 1 → save_only path, no embedding.
1129        let content_below_boundary = "A".repeat(min_length - 1);
1130        assert_eq!(content_below_boundary.len(), min_length - 1);
1131        agent
1132            .persist_message(Role::Assistant, &content_below_boundary, &[], false)
1133            .await;
1134
1135        let history = agent
1136            .services
1137            .memory
1138            .persistence
1139            .memory
1140            .as_ref()
1141            .unwrap()
1142            .sqlite()
1143            .load_history(cid, 50)
1144            .await
1145            .unwrap();
1146        assert_eq!(history.len(), 1, "message must still be saved");
1147        // save_only path does not embed.
1148        assert_eq!(rx.borrow().embeddings_generated, 0);
1149    }
1150
1151    #[tokio::test]
1152    async fn persist_message_increments_unsummarized_count() {
1153        let provider = mock_provider(vec![]);
1154        let channel = MockChannel::new(vec![]);
1155        let registry = create_test_registry();
1156        let executor = MockToolExecutor::no_tools();
1157
1158        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1159        let cid = memory.sqlite().create_conversation().await.unwrap();
1160
1161        // threshold=100 ensures no summarization is triggered
1162        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
1163            std::sync::Arc::new(memory),
1164            cid,
1165            50,
1166            5,
1167            100,
1168        );
1169
1170        assert_eq!(agent.services.memory.persistence.unsummarized_count, 0);
1171
1172        agent.persist_message(Role::User, "first", &[], false).await;
1173        assert_eq!(agent.services.memory.persistence.unsummarized_count, 1);
1174
1175        agent
1176            .persist_message(Role::User, "second", &[], false)
1177            .await;
1178        assert_eq!(agent.services.memory.persistence.unsummarized_count, 2);
1179    }
1180
1181    #[tokio::test]
1182    async fn check_summarization_resets_counter_on_success() {
1183        let provider = mock_provider(vec![]);
1184        let channel = MockChannel::new(vec![]);
1185        let registry = create_test_registry();
1186        let executor = MockToolExecutor::no_tools();
1187
1188        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1189        let cid = memory.sqlite().create_conversation().await.unwrap();
1190
1191        // threshold=1 so the second persist triggers summarization check (count > threshold)
1192        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
1193            std::sync::Arc::new(memory),
1194            cid,
1195            50,
1196            5,
1197            1,
1198        );
1199
1200        agent.persist_message(Role::User, "msg1", &[], false).await;
1201        agent.persist_message(Role::User, "msg2", &[], false).await;
1202
1203        // After summarization attempt (summarize returns Ok(None) since no messages qualify),
1204        // the counter is NOT reset to 0 — only reset on Ok(Some(_)).
1205        // This verifies check_summarization is called and the guard condition works.
1206        // unsummarized_count must be >= 2 before any summarization or 0 if summarization ran.
1207        assert!(agent.services.memory.persistence.unsummarized_count <= 2);
1208    }
1209
1210    #[tokio::test]
1211    async fn unsummarized_count_not_incremented_without_memory() {
1212        let provider = mock_provider(vec![]);
1213        let channel = MockChannel::new(vec![]);
1214        let registry = create_test_registry();
1215        let executor = MockToolExecutor::no_tools();
1216        let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
1217
1218        agent.persist_message(Role::User, "hello", &[], false).await;
1219        // No memory configured — persist_message returns early, counter must stay 0.
1220        assert_eq!(agent.services.memory.persistence.unsummarized_count, 0);
1221    }
1222
1223    // R-CRIT-01: unit tests for enqueue_graph_extraction_task guard conditions.
1224    mod graph_extraction_guards {
1225        use super::*;
1226        use crate::config::GraphConfig;
1227        use zeph_llm::provider::MessageMetadata;
1228        use zeph_memory::graph::GraphStore;
1229
1230        fn enabled_graph_config() -> GraphConfig {
1231            GraphConfig {
1232                enabled: true,
1233                ..GraphConfig::default()
1234            }
1235        }
1236
1237        async fn agent_with_graph(
1238            provider: &AnyProvider,
1239            config: GraphConfig,
1240        ) -> Agent<MockChannel> {
1241            let memory =
1242                test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1243            let cid = memory.sqlite().create_conversation().await.unwrap();
1244            Agent::new(
1245                provider.clone(),
1246                MockChannel::new(vec![]),
1247                create_test_registry(),
1248                None,
1249                5,
1250                MockToolExecutor::no_tools(),
1251            )
1252            .with_memory(std::sync::Arc::new(memory), cid, 50, 5, 100)
1253            .with_graph_config(config)
1254        }
1255
1256        #[tokio::test]
1257        async fn injection_flag_guard_skips_extraction() {
1258            // has_injection_flags=true → extraction must be skipped; no counter in graph_metadata.
1259            let provider = mock_provider(vec![]);
1260            let mut agent = agent_with_graph(&provider, enabled_graph_config()).await;
1261            let pool = agent
1262                .services
1263                .memory
1264                .persistence
1265                .memory
1266                .as_ref()
1267                .unwrap()
1268                .sqlite()
1269                .pool()
1270                .clone();
1271
1272            agent
1273                .enqueue_graph_extraction_task("I use Rust", true, false)
1274                .await;
1275
1276            // Give any accidental spawn time to settle.
1277            tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1278
1279            let store = GraphStore::new(pool);
1280            let count = store.get_metadata("extraction_count").await.unwrap();
1281            assert!(
1282                count.is_none(),
1283                "injection flag must prevent extraction_count from being written"
1284            );
1285        }
1286
1287        #[tokio::test]
1288        async fn disabled_config_guard_skips_extraction() {
1289            // graph.enabled=false → extraction must be skipped.
1290            let provider = mock_provider(vec![]);
1291            let disabled_cfg = GraphConfig {
1292                enabled: false,
1293                ..GraphConfig::default()
1294            };
1295            let mut agent = agent_with_graph(&provider, disabled_cfg).await;
1296            let pool = agent
1297                .services
1298                .memory
1299                .persistence
1300                .memory
1301                .as_ref()
1302                .unwrap()
1303                .sqlite()
1304                .pool()
1305                .clone();
1306
1307            agent
1308                .enqueue_graph_extraction_task("I use Rust", false, false)
1309                .await;
1310
1311            tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1312
1313            let store = GraphStore::new(pool);
1314            let count = store.get_metadata("extraction_count").await.unwrap();
1315            assert!(
1316                count.is_none(),
1317                "disabled graph config must prevent extraction"
1318            );
1319        }
1320
1321        #[tokio::test]
1322        async fn happy_path_fires_extraction() {
1323            // With enabled config and no injection flags, extraction is spawned.
1324            // MockProvider returns None (no entities), but the counter must be incremented.
1325            let provider = mock_provider(vec![]);
1326            let mut agent = agent_with_graph(&provider, enabled_graph_config()).await;
1327            let pool = agent
1328                .services
1329                .memory
1330                .persistence
1331                .memory
1332                .as_ref()
1333                .unwrap()
1334                .sqlite()
1335                .pool()
1336                .clone();
1337
1338            agent
1339                .enqueue_graph_extraction_task("I use Rust for systems programming", false, false)
1340                .await;
1341
1342            // Wait for the spawned task to complete.
1343            tokio::time::sleep(std::time::Duration::from_millis(200)).await;
1344
1345            let store = GraphStore::new(pool);
1346            let count = store.get_metadata("extraction_count").await.unwrap();
1347            assert!(
1348                count.is_some(),
1349                "happy-path extraction must increment extraction_count"
1350            );
1351        }
1352
1353        #[tokio::test]
1354        async fn tool_result_parts_guard_skips_extraction() {
1355            // FIX-1 regression: has_tool_result_parts=true → extraction must be skipped.
1356            // Tool result messages contain raw structured output (TOML, JSON, code) — not
1357            // conversational content. Extracting entities from them produces graph noise.
1358            let provider = mock_provider(vec![]);
1359            let mut agent = agent_with_graph(&provider, enabled_graph_config()).await;
1360            let pool = agent
1361                .services
1362                .memory
1363                .persistence
1364                .memory
1365                .as_ref()
1366                .unwrap()
1367                .sqlite()
1368                .pool()
1369                .clone();
1370
1371            agent
1372                .enqueue_graph_extraction_task(
1373                    "[tool_result: abc123]\nprovider_type = \"claude\"\nallowed_commands = []",
1374                    false,
1375                    true, // has_tool_result_parts
1376                )
1377                .await;
1378
1379            tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1380
1381            let store = GraphStore::new(pool);
1382            let count = store.get_metadata("extraction_count").await.unwrap();
1383            assert!(
1384                count.is_none(),
1385                "tool result message must not trigger graph extraction"
1386            );
1387        }
1388
1389        #[tokio::test]
1390        async fn context_filter_excludes_tool_result_messages() {
1391            // FIX-2: context_messages must not include tool result user messages.
1392            // When enqueue_graph_extraction_task collects context, it filters out
1393            // Role::User messages that contain ToolResult parts — only conversational
1394            // user messages are included as extraction context.
1395            //
1396            // This test verifies the guard fires: a tool result message alone is passed
1397            // (has_tool_result_parts=true) → extraction is skipped entirely, so context
1398            // filtering is not exercised. We verify FIX-2 by ensuring a prior tool result
1399            // message in agent.msg.messages is excluded when a subsequent conversational message
1400            // triggers extraction.
1401            let provider = mock_provider(vec![]);
1402            let mut agent = agent_with_graph(&provider, enabled_graph_config()).await;
1403
1404            // Add a tool result message to the agent's message history — this simulates
1405            // a tool call response that arrived before the current conversational turn.
1406            agent.msg.messages.push(Message {
1407                role: Role::User,
1408                content: "[tool_result: abc]\nprovider_type = \"openai\"".to_owned(),
1409                parts: vec![MessagePart::ToolResult {
1410                    tool_use_id: "abc".to_owned(),
1411                    content: "provider_type = \"openai\"".to_owned(),
1412                    is_error: false,
1413                }],
1414                metadata: MessageMetadata::default(),
1415            });
1416
1417            let pool = agent
1418                .services
1419                .memory
1420                .persistence
1421                .memory
1422                .as_ref()
1423                .unwrap()
1424                .sqlite()
1425                .pool()
1426                .clone();
1427
1428            // Trigger extraction for a conversational message (not a tool result).
1429            agent
1430                .enqueue_graph_extraction_task(
1431                    "I prefer Rust for systems programming",
1432                    false,
1433                    false,
1434                )
1435                .await;
1436
1437            tokio::time::sleep(std::time::Duration::from_millis(200)).await;
1438
1439            // Extraction must have fired (conversational message, no injection flags).
1440            let store = GraphStore::new(pool);
1441            let count = store.get_metadata("extraction_count").await.unwrap();
1442            assert!(
1443                count.is_some(),
1444                "conversational message must trigger extraction even with prior tool result in history"
1445            );
1446        }
1447    }
1448
1449    // R-PERS-01: unit tests for enqueue_persona_extraction_task guard conditions.
1450    mod persona_extraction_guards {
1451        use super::*;
1452        use zeph_config::PersonaConfig;
1453        use zeph_llm::provider::MessageMetadata;
1454
1455        fn enabled_persona_config() -> PersonaConfig {
1456            PersonaConfig {
1457                enabled: true,
1458                min_messages: 1,
1459                ..PersonaConfig::default()
1460            }
1461        }
1462
1463        async fn agent_with_persona(
1464            provider: &AnyProvider,
1465            config: PersonaConfig,
1466        ) -> Agent<MockChannel> {
1467            let memory =
1468                test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1469            let cid = memory.sqlite().create_conversation().await.unwrap();
1470            let mut agent = Agent::new(
1471                provider.clone(),
1472                MockChannel::new(vec![]),
1473                create_test_registry(),
1474                None,
1475                5,
1476                MockToolExecutor::no_tools(),
1477            )
1478            .with_memory(std::sync::Arc::new(memory), cid, 50, 5, 100);
1479            agent.services.memory.extraction.persona_config = config;
1480            agent
1481        }
1482
1483        #[tokio::test]
1484        async fn disabled_config_skips_spawn() {
1485            // persona.enabled=false → nothing is spawned; persona_memory stays empty.
1486            let provider = mock_provider(vec![]);
1487            let mut agent = agent_with_persona(
1488                &provider,
1489                PersonaConfig {
1490                    enabled: false,
1491                    ..PersonaConfig::default()
1492                },
1493            )
1494            .await;
1495
1496            // Inject a user message so message count is above threshold.
1497            agent.msg.messages.push(zeph_llm::provider::Message {
1498                role: Role::User,
1499                content: "I prefer Rust for systems programming".to_owned(),
1500                parts: vec![],
1501                metadata: MessageMetadata::default(),
1502            });
1503
1504            agent.enqueue_persona_extraction_task();
1505
1506            let store = agent
1507                .services
1508                .memory
1509                .persistence
1510                .memory
1511                .as_ref()
1512                .unwrap()
1513                .sqlite()
1514                .clone();
1515            let count = store.count_persona_facts().await.unwrap();
1516            assert_eq!(count, 0, "disabled persona config must not write any facts");
1517        }
1518
1519        #[tokio::test]
1520        async fn below_min_messages_skips_spawn() {
1521            // min_messages=3 but only 2 user messages → should skip.
1522            let provider = mock_provider(vec![]);
1523            let mut agent = agent_with_persona(
1524                &provider,
1525                PersonaConfig {
1526                    enabled: true,
1527                    min_messages: 3,
1528                    ..PersonaConfig::default()
1529                },
1530            )
1531            .await;
1532
1533            for text in ["I use Rust", "I prefer async code"] {
1534                agent.msg.messages.push(zeph_llm::provider::Message {
1535                    role: Role::User,
1536                    content: text.to_owned(),
1537                    parts: vec![],
1538                    metadata: MessageMetadata::default(),
1539                });
1540            }
1541
1542            agent.enqueue_persona_extraction_task();
1543
1544            let store = agent
1545                .services
1546                .memory
1547                .persistence
1548                .memory
1549                .as_ref()
1550                .unwrap()
1551                .sqlite()
1552                .clone();
1553            let count = store.count_persona_facts().await.unwrap();
1554            assert_eq!(
1555                count, 0,
1556                "below min_messages threshold must not trigger extraction"
1557            );
1558        }
1559
1560        #[tokio::test]
1561        async fn no_memory_skips_spawn() {
1562            // memory=None → must exit early without panic.
1563            let provider = mock_provider(vec![]);
1564            let channel = MockChannel::new(vec![]);
1565            let registry = create_test_registry();
1566            let executor = MockToolExecutor::no_tools();
1567            let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
1568            agent.services.memory.extraction.persona_config = enabled_persona_config();
1569            agent.msg.messages.push(zeph_llm::provider::Message {
1570                role: Role::User,
1571                content: "I like Rust".to_owned(),
1572                parts: vec![],
1573                metadata: MessageMetadata::default(),
1574            });
1575
1576            // Must not panic even without memory.
1577            agent.enqueue_persona_extraction_task();
1578        }
1579
1580        #[tokio::test]
1581        async fn enabled_enough_messages_spawns_extraction() {
1582            // enabled=true, min_messages=1, self-referential message → extraction runs eagerly
1583            // (not fire-and-forget) and chat() is called on the provider, verified via MockProvider.
1584            use zeph_llm::mock::MockProvider;
1585            let (mock, recorded) = MockProvider::default().with_recording();
1586            let provider = AnyProvider::Mock(mock);
1587            let mut agent = agent_with_persona(&provider, enabled_persona_config()).await;
1588
1589            agent.msg.messages.push(zeph_llm::provider::Message {
1590                role: Role::User,
1591                content: "I prefer Rust for systems programming".to_owned(),
1592                parts: vec![],
1593                metadata: MessageMetadata::default(),
1594            });
1595
1596            agent.enqueue_persona_extraction_task();
1597
1598            // Persona extraction runs via BackgroundSupervisor. Wait for tasks to complete.
1599            agent.runtime.lifecycle.supervisor.join_all_for_test().await;
1600
1601            let calls = recorded.lock().unwrap();
1602            assert!(
1603                !calls.is_empty(),
1604                "happy-path: provider.chat() must be called when extraction completes"
1605            );
1606        }
1607
1608        #[tokio::test]
1609        async fn messages_capped_at_eight() {
1610            // More than 8 user messages → only 8 are passed to extraction.
1611            // Each message contains "I" so self-referential gate passes.
1612            use zeph_llm::mock::MockProvider;
1613            let (mock, recorded) = MockProvider::default().with_recording();
1614            let provider = AnyProvider::Mock(mock);
1615            let mut agent = agent_with_persona(&provider, enabled_persona_config()).await;
1616
1617            for i in 0..12u32 {
1618                agent.msg.messages.push(zeph_llm::provider::Message {
1619                    role: Role::User,
1620                    content: format!("I like message {i}"),
1621                    parts: vec![],
1622                    metadata: MessageMetadata::default(),
1623                });
1624            }
1625
1626            agent.enqueue_persona_extraction_task();
1627
1628            // Allow the background task to complete before asserting.
1629            agent.runtime.lifecycle.supervisor.join_all_for_test().await;
1630
1631            // Verify extraction ran (provider was called).
1632            let calls = recorded.lock().unwrap();
1633            assert!(
1634                !calls.is_empty(),
1635                "extraction must run when enough messages present"
1636            );
1637            // Verify the prompt sent to the provider does not contain messages beyond the 8th.
1638            let prompt = &calls[0];
1639            let user_text = prompt
1640                .iter()
1641                .filter(|m| m.role == Role::User)
1642                .map(|m| m.content.as_str())
1643                .collect::<Vec<_>>()
1644                .join(" ");
1645            // Messages 8..11 ("I like message 8".."I like message 11") must not appear.
1646            assert!(
1647                !user_text.contains("I like message 8"),
1648                "message index 8 must be excluded from extraction input"
1649            );
1650        }
1651
1652        #[test]
1653        fn long_message_truncated_at_char_boundary() {
1654            // Directly verify the per-message truncation logic applied in
1655            // enqueue_persona_extraction_task: a content > 2048 bytes must be capped
1656            // to exactly floor_char_boundary(2048).
1657            let long_content = "x".repeat(3000);
1658            let truncated = if long_content.len() > 2048 {
1659                long_content[..long_content.floor_char_boundary(2048)].to_owned()
1660            } else {
1661                long_content.clone()
1662            };
1663            assert_eq!(
1664                truncated.len(),
1665                2048,
1666                "ASCII content must be truncated to exactly 2048 bytes"
1667            );
1668
1669            // Multi-byte boundary: build a string whose char boundary falls before 2048.
1670            let multi = "é".repeat(1500); // each 'é' is 2 bytes → 3000 bytes total
1671            let truncated_multi = if multi.len() > 2048 {
1672                multi[..multi.floor_char_boundary(2048)].to_owned()
1673            } else {
1674                multi.clone()
1675            };
1676            assert!(
1677                truncated_multi.len() <= 2048,
1678                "multi-byte content must not exceed 2048 bytes"
1679            );
1680            assert!(truncated_multi.is_char_boundary(truncated_multi.len()));
1681        }
1682    }
1683
1684    #[tokio::test]
1685    async fn persist_message_user_always_embeds_regardless_of_autosave_flag() {
1686        let provider = mock_provider(vec![]);
1687        let channel = MockChannel::new(vec![]);
1688        let registry = create_test_registry();
1689        let executor = MockToolExecutor::no_tools();
1690
1691        let (tx, rx) = tokio::sync::watch::channel(MetricsSnapshot::default());
1692        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1693        let cid = memory.sqlite().create_conversation().await.unwrap();
1694
1695        // autosave_assistant=false — but User role always takes embedding path
1696        let mut agent = Agent::new(provider, channel, registry, None, 5, executor)
1697            .with_metrics(tx)
1698            .with_memory(std::sync::Arc::new(memory), cid, 50, 5, 100);
1699        agent.services.memory.persistence.autosave_assistant = false;
1700        agent.services.memory.persistence.autosave_min_length = 20;
1701
1702        let long_user_msg = "A".repeat(100);
1703        agent
1704            .persist_message(Role::User, &long_user_msg, &[], false)
1705            .await;
1706
1707        let history = agent
1708            .services
1709            .memory
1710            .persistence
1711            .memory
1712            .as_ref()
1713            .unwrap()
1714            .sqlite()
1715            .load_history(cid, 50)
1716            .await
1717            .unwrap();
1718        assert_eq!(history.len(), 1, "user message must be saved");
1719        // User messages go through remember_with_parts (embedding path).
1720        // sqlite_message_count must increment regardless of Qdrant availability.
1721        assert_eq!(rx.borrow().sqlite_message_count, 1);
1722    }
1723
1724    // Round-trip tests: verify that persist_message saves the correct parts and they
1725    // are restored correctly by load_history.
1726
1727    #[tokio::test]
1728    async fn persist_message_saves_correct_tool_use_parts() {
1729        use zeph_llm::provider::MessagePart;
1730
1731        let provider = mock_provider(vec![]);
1732        let channel = MockChannel::new(vec![]);
1733        let registry = create_test_registry();
1734        let executor = MockToolExecutor::no_tools();
1735
1736        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1737        let cid = memory.sqlite().create_conversation().await.unwrap();
1738
1739        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
1740            std::sync::Arc::new(memory),
1741            cid,
1742            50,
1743            5,
1744            100,
1745        );
1746
1747        let parts = vec![MessagePart::ToolUse {
1748            id: "call_abc123".to_string(),
1749            name: "read_file".to_string(),
1750            input: serde_json::json!({"path": "/tmp/test.txt"}),
1751        }];
1752        let content = "[tool_use: read_file(call_abc123)]";
1753
1754        agent
1755            .persist_message(Role::Assistant, content, &parts, false)
1756            .await;
1757
1758        let history = agent
1759            .services
1760            .memory
1761            .persistence
1762            .memory
1763            .as_ref()
1764            .unwrap()
1765            .sqlite()
1766            .load_history(cid, 50)
1767            .await
1768            .unwrap();
1769
1770        assert_eq!(history.len(), 1);
1771        assert_eq!(history[0].role, Role::Assistant);
1772        assert_eq!(history[0].content, content);
1773        assert_eq!(history[0].parts.len(), 1);
1774        match &history[0].parts[0] {
1775            MessagePart::ToolUse { id, name, .. } => {
1776                assert_eq!(id, "call_abc123");
1777                assert_eq!(name, "read_file");
1778            }
1779            other => panic!("expected ToolUse part, got {other:?}"),
1780        }
1781        // Regression guard: assistant message must NOT have ToolResult parts
1782        assert!(
1783            !history[0]
1784                .parts
1785                .iter()
1786                .any(|p| matches!(p, MessagePart::ToolResult { .. })),
1787            "assistant message must not contain ToolResult parts"
1788        );
1789    }
1790
1791    #[tokio::test]
1792    async fn persist_message_saves_correct_tool_result_parts() {
1793        use zeph_llm::provider::MessagePart;
1794
1795        let provider = mock_provider(vec![]);
1796        let channel = MockChannel::new(vec![]);
1797        let registry = create_test_registry();
1798        let executor = MockToolExecutor::no_tools();
1799
1800        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1801        let cid = memory.sqlite().create_conversation().await.unwrap();
1802
1803        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
1804            std::sync::Arc::new(memory),
1805            cid,
1806            50,
1807            5,
1808            100,
1809        );
1810
1811        let parts = vec![MessagePart::ToolResult {
1812            tool_use_id: "call_abc123".to_string(),
1813            content: "file contents here".to_string(),
1814            is_error: false,
1815        }];
1816        let content = "[tool_result: call_abc123]\nfile contents here";
1817
1818        agent
1819            .persist_message(Role::User, content, &parts, false)
1820            .await;
1821
1822        let history = agent
1823            .services
1824            .memory
1825            .persistence
1826            .memory
1827            .as_ref()
1828            .unwrap()
1829            .sqlite()
1830            .load_history(cid, 50)
1831            .await
1832            .unwrap();
1833
1834        assert_eq!(history.len(), 1);
1835        assert_eq!(history[0].role, Role::User);
1836        assert_eq!(history[0].content, content);
1837        assert_eq!(history[0].parts.len(), 1);
1838        match &history[0].parts[0] {
1839            MessagePart::ToolResult {
1840                tool_use_id,
1841                content: result_content,
1842                is_error,
1843            } => {
1844                assert_eq!(tool_use_id, "call_abc123");
1845                assert_eq!(result_content, "file contents here");
1846                assert!(!is_error);
1847            }
1848            other => panic!("expected ToolResult part, got {other:?}"),
1849        }
1850        // Regression guard: user message with ToolResult must NOT have ToolUse parts
1851        assert!(
1852            !history[0]
1853                .parts
1854                .iter()
1855                .any(|p| matches!(p, MessagePart::ToolUse { .. })),
1856            "user ToolResult message must not contain ToolUse parts"
1857        );
1858    }
1859
1860    #[tokio::test]
1861    async fn persist_message_roundtrip_preserves_role_part_alignment() {
1862        use zeph_llm::provider::MessagePart;
1863
1864        let provider = mock_provider(vec![]);
1865        let channel = MockChannel::new(vec![]);
1866        let registry = create_test_registry();
1867        let executor = MockToolExecutor::no_tools();
1868
1869        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1870        let cid = memory.sqlite().create_conversation().await.unwrap();
1871
1872        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
1873            std::sync::Arc::new(memory),
1874            cid,
1875            50,
1876            5,
1877            100,
1878        );
1879
1880        // Persist assistant message with ToolUse parts
1881        let assistant_parts = vec![MessagePart::ToolUse {
1882            id: "id_1".to_string(),
1883            name: "list_dir".to_string(),
1884            input: serde_json::json!({"path": "/tmp"}),
1885        }];
1886        agent
1887            .persist_message(
1888                Role::Assistant,
1889                "[tool_use: list_dir(id_1)]",
1890                &assistant_parts,
1891                false,
1892            )
1893            .await;
1894
1895        // Persist user message with ToolResult parts
1896        let user_parts = vec![MessagePart::ToolResult {
1897            tool_use_id: "id_1".to_string(),
1898            content: "file1.txt\nfile2.txt".to_string(),
1899            is_error: false,
1900        }];
1901        agent
1902            .persist_message(
1903                Role::User,
1904                "[tool_result: id_1]\nfile1.txt\nfile2.txt",
1905                &user_parts,
1906                false,
1907            )
1908            .await;
1909
1910        let history = agent
1911            .services
1912            .memory
1913            .persistence
1914            .memory
1915            .as_ref()
1916            .unwrap()
1917            .sqlite()
1918            .load_history(cid, 50)
1919            .await
1920            .unwrap();
1921
1922        assert_eq!(history.len(), 2);
1923
1924        // First message: assistant + ToolUse
1925        assert_eq!(history[0].role, Role::Assistant);
1926        assert_eq!(history[0].content, "[tool_use: list_dir(id_1)]");
1927        assert!(
1928            matches!(&history[0].parts[0], MessagePart::ToolUse { id, .. } if id == "id_1"),
1929            "first message must be assistant ToolUse"
1930        );
1931
1932        // Second message: user + ToolResult
1933        assert_eq!(history[1].role, Role::User);
1934        assert_eq!(
1935            history[1].content,
1936            "[tool_result: id_1]\nfile1.txt\nfile2.txt"
1937        );
1938        assert!(
1939            matches!(&history[1].parts[0], MessagePart::ToolResult { tool_use_id, .. } if tool_use_id == "id_1"),
1940            "second message must be user ToolResult"
1941        );
1942
1943        // Cross-role regression guard: no swapped parts
1944        assert!(
1945            !history[0]
1946                .parts
1947                .iter()
1948                .any(|p| matches!(p, MessagePart::ToolResult { .. })),
1949            "assistant message must not have ToolResult parts"
1950        );
1951        assert!(
1952            !history[1]
1953                .parts
1954                .iter()
1955                .any(|p| matches!(p, MessagePart::ToolUse { .. })),
1956            "user message must not have ToolUse parts"
1957        );
1958    }
1959
1960    #[tokio::test]
1961    async fn persist_message_saves_correct_tool_output_parts() {
1962        use zeph_llm::provider::MessagePart;
1963
1964        let provider = mock_provider(vec![]);
1965        let channel = MockChannel::new(vec![]);
1966        let registry = create_test_registry();
1967        let executor = MockToolExecutor::no_tools();
1968
1969        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1970        let cid = memory.sqlite().create_conversation().await.unwrap();
1971
1972        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
1973            std::sync::Arc::new(memory),
1974            cid,
1975            50,
1976            5,
1977            100,
1978        );
1979
1980        let parts = vec![MessagePart::ToolOutput {
1981            tool_name: "shell".into(),
1982            body: "hello from shell".to_string(),
1983            compacted_at: None,
1984        }];
1985        let content = "[tool: shell]\nhello from shell";
1986
1987        agent
1988            .persist_message(Role::User, content, &parts, false)
1989            .await;
1990
1991        let history = agent
1992            .services
1993            .memory
1994            .persistence
1995            .memory
1996            .as_ref()
1997            .unwrap()
1998            .sqlite()
1999            .load_history(cid, 50)
2000            .await
2001            .unwrap();
2002
2003        assert_eq!(history.len(), 1);
2004        assert_eq!(history[0].role, Role::User);
2005        assert_eq!(history[0].content, content);
2006        assert_eq!(history[0].parts.len(), 1);
2007        match &history[0].parts[0] {
2008            MessagePart::ToolOutput {
2009                tool_name,
2010                body,
2011                compacted_at,
2012            } => {
2013                assert_eq!(tool_name, "shell");
2014                assert_eq!(body, "hello from shell");
2015                assert!(compacted_at.is_none());
2016            }
2017            other => panic!("expected ToolOutput part, got {other:?}"),
2018        }
2019    }
2020
2021    // --- sanitize_tool_pairs unit tests ---
2022
2023    #[tokio::test]
2024    async fn load_history_removes_trailing_orphan_tool_use() {
2025        use zeph_llm::provider::MessagePart;
2026
2027        let provider = mock_provider(vec![]);
2028        let channel = MockChannel::new(vec![]);
2029        let registry = create_test_registry();
2030        let executor = MockToolExecutor::no_tools();
2031
2032        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2033        let cid = memory.sqlite().create_conversation().await.unwrap();
2034        let sqlite = memory.sqlite();
2035
2036        // user message (normal)
2037        sqlite
2038            .save_message(cid, "user", "do something with a tool")
2039            .await
2040            .unwrap();
2041
2042        // assistant message with ToolUse parts — no following tool_result (orphan)
2043        let parts = serde_json::to_string(&[MessagePart::ToolUse {
2044            id: "call_orphan".to_string(),
2045            name: "shell".to_string(),
2046            input: serde_json::json!({"command": "ls"}),
2047        }])
2048        .unwrap();
2049        sqlite
2050            .save_message_with_parts(cid, "assistant", "[tool_use: shell(call_orphan)]", &parts)
2051            .await
2052            .unwrap();
2053
2054        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2055            std::sync::Arc::new(memory),
2056            cid,
2057            50,
2058            5,
2059            100,
2060        );
2061
2062        let messages_before = agent.msg.messages.len();
2063        agent.load_history().await.unwrap();
2064
2065        // Only the user message should be loaded; orphaned assistant tool_use removed.
2066        assert_eq!(
2067            agent.msg.messages.len(),
2068            messages_before + 1,
2069            "orphaned trailing tool_use must be removed"
2070        );
2071        assert_eq!(agent.msg.messages.last().unwrap().role, Role::User);
2072    }
2073
2074    #[tokio::test]
2075    async fn load_history_removes_leading_orphan_tool_result() {
2076        use zeph_llm::provider::MessagePart;
2077
2078        let provider = mock_provider(vec![]);
2079        let channel = MockChannel::new(vec![]);
2080        let registry = create_test_registry();
2081        let executor = MockToolExecutor::no_tools();
2082
2083        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2084        let cid = memory.sqlite().create_conversation().await.unwrap();
2085        let sqlite = memory.sqlite();
2086
2087        // Leading orphan: user message with ToolResult but no preceding tool_use
2088        let result_parts = serde_json::to_string(&[MessagePart::ToolResult {
2089            tool_use_id: "call_missing".to_string(),
2090            content: "result data".to_string(),
2091            is_error: false,
2092        }])
2093        .unwrap();
2094        sqlite
2095            .save_message_with_parts(
2096                cid,
2097                "user",
2098                "[tool_result: call_missing]\nresult data",
2099                &result_parts,
2100            )
2101            .await
2102            .unwrap();
2103
2104        // A valid assistant reply after the orphan
2105        sqlite
2106            .save_message(cid, "assistant", "here is my response")
2107            .await
2108            .unwrap();
2109
2110        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2111            std::sync::Arc::new(memory),
2112            cid,
2113            50,
2114            5,
2115            100,
2116        );
2117
2118        let messages_before = agent.msg.messages.len();
2119        agent.load_history().await.unwrap();
2120
2121        // Orphaned leading tool_result removed; only assistant message kept.
2122        assert_eq!(
2123            agent.msg.messages.len(),
2124            messages_before + 1,
2125            "orphaned leading tool_result must be removed"
2126        );
2127        assert_eq!(agent.msg.messages.last().unwrap().role, Role::Assistant);
2128    }
2129
2130    #[tokio::test]
2131    async fn load_history_preserves_complete_tool_pairs() {
2132        use zeph_llm::provider::MessagePart;
2133
2134        let provider = mock_provider(vec![]);
2135        let channel = MockChannel::new(vec![]);
2136        let registry = create_test_registry();
2137        let executor = MockToolExecutor::no_tools();
2138
2139        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2140        let cid = memory.sqlite().create_conversation().await.unwrap();
2141        let sqlite = memory.sqlite();
2142
2143        // Complete tool_use / tool_result pair
2144        let use_parts = serde_json::to_string(&[MessagePart::ToolUse {
2145            id: "call_ok".to_string(),
2146            name: "shell".to_string(),
2147            input: serde_json::json!({"command": "pwd"}),
2148        }])
2149        .unwrap();
2150        sqlite
2151            .save_message_with_parts(cid, "assistant", "[tool_use: shell(call_ok)]", &use_parts)
2152            .await
2153            .unwrap();
2154
2155        let result_parts = serde_json::to_string(&[MessagePart::ToolResult {
2156            tool_use_id: "call_ok".to_string(),
2157            content: "/home/user".to_string(),
2158            is_error: false,
2159        }])
2160        .unwrap();
2161        sqlite
2162            .save_message_with_parts(
2163                cid,
2164                "user",
2165                "[tool_result: call_ok]\n/home/user",
2166                &result_parts,
2167            )
2168            .await
2169            .unwrap();
2170
2171        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2172            std::sync::Arc::new(memory),
2173            cid,
2174            50,
2175            5,
2176            100,
2177        );
2178
2179        let messages_before = agent.msg.messages.len();
2180        agent.load_history().await.unwrap();
2181
2182        // Both messages must be preserved.
2183        assert_eq!(
2184            agent.msg.messages.len(),
2185            messages_before + 2,
2186            "complete tool_use/tool_result pair must be preserved"
2187        );
2188        assert_eq!(agent.msg.messages[messages_before].role, Role::Assistant);
2189        assert_eq!(agent.msg.messages[messages_before + 1].role, Role::User);
2190    }
2191
2192    #[tokio::test]
2193    async fn load_history_handles_multiple_trailing_orphans() {
2194        use zeph_llm::provider::MessagePart;
2195
2196        let provider = mock_provider(vec![]);
2197        let channel = MockChannel::new(vec![]);
2198        let registry = create_test_registry();
2199        let executor = MockToolExecutor::no_tools();
2200
2201        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2202        let cid = memory.sqlite().create_conversation().await.unwrap();
2203        let sqlite = memory.sqlite();
2204
2205        // Normal user message
2206        sqlite.save_message(cid, "user", "start").await.unwrap();
2207
2208        // First orphaned tool_use
2209        let parts1 = serde_json::to_string(&[MessagePart::ToolUse {
2210            id: "call_1".to_string(),
2211            name: "shell".to_string(),
2212            input: serde_json::json!({}),
2213        }])
2214        .unwrap();
2215        sqlite
2216            .save_message_with_parts(cid, "assistant", "[tool_use: shell(call_1)]", &parts1)
2217            .await
2218            .unwrap();
2219
2220        // Second orphaned tool_use (consecutive, no tool_result between them)
2221        let parts2 = serde_json::to_string(&[MessagePart::ToolUse {
2222            id: "call_2".to_string(),
2223            name: "read_file".to_string(),
2224            input: serde_json::json!({}),
2225        }])
2226        .unwrap();
2227        sqlite
2228            .save_message_with_parts(cid, "assistant", "[tool_use: read_file(call_2)]", &parts2)
2229            .await
2230            .unwrap();
2231
2232        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2233            std::sync::Arc::new(memory),
2234            cid,
2235            50,
2236            5,
2237            100,
2238        );
2239
2240        let messages_before = agent.msg.messages.len();
2241        agent.load_history().await.unwrap();
2242
2243        // Both orphaned tool_use messages removed; only the user message kept.
2244        assert_eq!(
2245            agent.msg.messages.len(),
2246            messages_before + 1,
2247            "all trailing orphaned tool_use messages must be removed"
2248        );
2249        assert_eq!(agent.msg.messages.last().unwrap().role, Role::User);
2250    }
2251
2252    #[tokio::test]
2253    async fn load_history_no_tool_messages_unchanged() {
2254        let provider = mock_provider(vec![]);
2255        let channel = MockChannel::new(vec![]);
2256        let registry = create_test_registry();
2257        let executor = MockToolExecutor::no_tools();
2258
2259        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2260        let cid = memory.sqlite().create_conversation().await.unwrap();
2261        let sqlite = memory.sqlite();
2262
2263        sqlite.save_message(cid, "user", "hello").await.unwrap();
2264        sqlite
2265            .save_message(cid, "assistant", "hi there")
2266            .await
2267            .unwrap();
2268        sqlite
2269            .save_message(cid, "user", "how are you?")
2270            .await
2271            .unwrap();
2272
2273        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2274            std::sync::Arc::new(memory),
2275            cid,
2276            50,
2277            5,
2278            100,
2279        );
2280
2281        let messages_before = agent.msg.messages.len();
2282        agent.load_history().await.unwrap();
2283
2284        // All three plain messages must be preserved.
2285        assert_eq!(
2286            agent.msg.messages.len(),
2287            messages_before + 3,
2288            "plain messages without tool parts must pass through unchanged"
2289        );
2290    }
2291
2292    #[tokio::test]
2293    async fn load_history_removes_both_leading_and_trailing_orphans() {
2294        use zeph_llm::provider::MessagePart;
2295
2296        let provider = mock_provider(vec![]);
2297        let channel = MockChannel::new(vec![]);
2298        let registry = create_test_registry();
2299        let executor = MockToolExecutor::no_tools();
2300
2301        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2302        let cid = memory.sqlite().create_conversation().await.unwrap();
2303        let sqlite = memory.sqlite();
2304
2305        // Leading orphan: user message with ToolResult, no preceding tool_use
2306        let result_parts = serde_json::to_string(&[MessagePart::ToolResult {
2307            tool_use_id: "call_leading".to_string(),
2308            content: "orphaned result".to_string(),
2309            is_error: false,
2310        }])
2311        .unwrap();
2312        sqlite
2313            .save_message_with_parts(
2314                cid,
2315                "user",
2316                "[tool_result: call_leading]\norphaned result",
2317                &result_parts,
2318            )
2319            .await
2320            .unwrap();
2321
2322        // Valid middle messages
2323        sqlite
2324            .save_message(cid, "user", "what is 2+2?")
2325            .await
2326            .unwrap();
2327        sqlite.save_message(cid, "assistant", "4").await.unwrap();
2328
2329        // Trailing orphan: assistant message with ToolUse, no following tool_result
2330        let use_parts = serde_json::to_string(&[MessagePart::ToolUse {
2331            id: "call_trailing".to_string(),
2332            name: "shell".to_string(),
2333            input: serde_json::json!({"command": "date"}),
2334        }])
2335        .unwrap();
2336        sqlite
2337            .save_message_with_parts(
2338                cid,
2339                "assistant",
2340                "[tool_use: shell(call_trailing)]",
2341                &use_parts,
2342            )
2343            .await
2344            .unwrap();
2345
2346        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2347            std::sync::Arc::new(memory),
2348            cid,
2349            50,
2350            5,
2351            100,
2352        );
2353
2354        let messages_before = agent.msg.messages.len();
2355        agent.load_history().await.unwrap();
2356
2357        // Both orphans removed; only the 2 valid middle messages kept.
2358        assert_eq!(
2359            agent.msg.messages.len(),
2360            messages_before + 2,
2361            "both leading and trailing orphans must be removed"
2362        );
2363        assert_eq!(agent.msg.messages[messages_before].role, Role::User);
2364        assert_eq!(agent.msg.messages[messages_before].content, "what is 2+2?");
2365        assert_eq!(
2366            agent.msg.messages[messages_before + 1].role,
2367            Role::Assistant
2368        );
2369        assert_eq!(agent.msg.messages[messages_before + 1].content, "4");
2370    }
2371
2372    /// RC1 regression: mid-history assistant[`ToolUse`] without a following user[`ToolResult`]
2373    /// must have its `ToolUse` parts stripped (text preserved). The message count stays the same
2374    /// because the assistant message has a text content fallback; only `ToolUse` parts are
2375    /// removed.
2376    #[tokio::test]
2377    async fn sanitize_tool_pairs_strips_mid_history_orphan_tool_use() {
2378        use zeph_llm::provider::MessagePart;
2379
2380        let provider = mock_provider(vec![]);
2381        let channel = MockChannel::new(vec![]);
2382        let registry = create_test_registry();
2383        let executor = MockToolExecutor::no_tools();
2384
2385        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2386        let cid = memory.sqlite().create_conversation().await.unwrap();
2387        let sqlite = memory.sqlite();
2388
2389        // Normal first exchange.
2390        sqlite
2391            .save_message(cid, "user", "first question")
2392            .await
2393            .unwrap();
2394        sqlite
2395            .save_message(cid, "assistant", "first answer")
2396            .await
2397            .unwrap();
2398
2399        // Mid-history orphan: assistant with ToolUse but NO following ToolResult user message.
2400        // This models the compaction-split scenario (RC2) where replace_conversation hid the
2401        // user[ToolResult] but left the assistant[ToolUse] visible.
2402        let use_parts = serde_json::to_string(&[
2403            MessagePart::ToolUse {
2404                id: "call_mid_1".to_string(),
2405                name: "shell".to_string(),
2406                input: serde_json::json!({"command": "ls"}),
2407            },
2408            MessagePart::Text {
2409                text: "Let me check the files.".to_string(),
2410            },
2411        ])
2412        .unwrap();
2413        sqlite
2414            .save_message_with_parts(cid, "assistant", "Let me check the files.", &use_parts)
2415            .await
2416            .unwrap();
2417
2418        // Another normal exchange after the orphan.
2419        sqlite
2420            .save_message(cid, "user", "second question")
2421            .await
2422            .unwrap();
2423        sqlite
2424            .save_message(cid, "assistant", "second answer")
2425            .await
2426            .unwrap();
2427
2428        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2429            std::sync::Arc::new(memory),
2430            cid,
2431            50,
2432            5,
2433            100,
2434        );
2435
2436        let messages_before = agent.msg.messages.len();
2437        agent.load_history().await.unwrap();
2438
2439        // All 5 messages remain (orphan message kept because it has text), but the orphaned
2440        // message must have its ToolUse parts stripped.
2441        assert_eq!(
2442            agent.msg.messages.len(),
2443            messages_before + 5,
2444            "message count must be 5 (orphan message kept — has text content)"
2445        );
2446
2447        // The orphaned assistant message (index 2 in the loaded slice) must have no ToolUse parts.
2448        let orphan = &agent.msg.messages[messages_before + 2];
2449        assert_eq!(orphan.role, Role::Assistant);
2450        assert!(
2451            !orphan
2452                .parts
2453                .iter()
2454                .any(|p| matches!(p, MessagePart::ToolUse { .. })),
2455            "orphaned ToolUse parts must be stripped from mid-history message"
2456        );
2457        // Text part must be preserved.
2458        assert!(
2459            orphan.parts.iter().any(
2460                |p| matches!(p, MessagePart::Text { text } if text == "Let me check the files.")
2461            ),
2462            "text content of orphaned assistant message must be preserved"
2463        );
2464    }
2465
2466    /// RC3 regression: a user message with empty `content` but non-empty `parts` (`ToolResult`)
2467    /// must NOT be skipped by `load_history`. Previously the empty-content check dropped these
2468    /// messages before `sanitize_tool_pairs` ran, leaving the preceding assistant `ToolUse`
2469    /// orphaned.
2470    #[tokio::test]
2471    async fn load_history_keeps_tool_only_user_message() {
2472        use zeph_llm::provider::MessagePart;
2473
2474        let provider = mock_provider(vec![]);
2475        let channel = MockChannel::new(vec![]);
2476        let registry = create_test_registry();
2477        let executor = MockToolExecutor::no_tools();
2478
2479        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2480        let cid = memory.sqlite().create_conversation().await.unwrap();
2481        let sqlite = memory.sqlite();
2482
2483        // Assistant sends a ToolUse.
2484        let use_parts = serde_json::to_string(&[MessagePart::ToolUse {
2485            id: "call_rc3".to_string(),
2486            name: "memory_save".to_string(),
2487            input: serde_json::json!({"content": "something"}),
2488        }])
2489        .unwrap();
2490        sqlite
2491            .save_message_with_parts(cid, "assistant", "[tool_use: memory_save]", &use_parts)
2492            .await
2493            .unwrap();
2494
2495        // User message has empty text content but carries ToolResult in parts — native tool pattern.
2496        let result_parts = serde_json::to_string(&[MessagePart::ToolResult {
2497            tool_use_id: "call_rc3".to_string(),
2498            content: "saved".to_string(),
2499            is_error: false,
2500        }])
2501        .unwrap();
2502        sqlite
2503            .save_message_with_parts(cid, "user", "", &result_parts)
2504            .await
2505            .unwrap();
2506
2507        sqlite.save_message(cid, "assistant", "done").await.unwrap();
2508
2509        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2510            std::sync::Arc::new(memory),
2511            cid,
2512            50,
2513            5,
2514            100,
2515        );
2516
2517        let messages_before = agent.msg.messages.len();
2518        agent.load_history().await.unwrap();
2519
2520        // All 3 messages must be loaded — the empty-content ToolResult user message must NOT be
2521        // dropped.
2522        assert_eq!(
2523            agent.msg.messages.len(),
2524            messages_before + 3,
2525            "user message with empty content but ToolResult parts must not be dropped"
2526        );
2527
2528        // The user message at index 1 must still carry the ToolResult part.
2529        let user_msg = &agent.msg.messages[messages_before + 1];
2530        assert_eq!(user_msg.role, Role::User);
2531        assert!(
2532            user_msg.parts.iter().any(
2533                |p| matches!(p, MessagePart::ToolResult { tool_use_id, .. } if tool_use_id == "call_rc3")
2534            ),
2535            "ToolResult part must be preserved on user message with empty content"
2536        );
2537    }
2538
2539    /// RC2 reverse pass: a user message with a `ToolResult` whose `tool_use_id` has no matching
2540    /// `ToolUse` in the preceding assistant message must be stripped by
2541    /// `strip_mid_history_orphans`.
2542    #[tokio::test]
2543    async fn strip_orphans_removes_orphaned_tool_result() {
2544        use zeph_llm::provider::MessagePart;
2545
2546        let provider = mock_provider(vec![]);
2547        let channel = MockChannel::new(vec![]);
2548        let registry = create_test_registry();
2549        let executor = MockToolExecutor::no_tools();
2550
2551        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2552        let cid = memory.sqlite().create_conversation().await.unwrap();
2553        let sqlite = memory.sqlite();
2554
2555        // Normal exchange before the orphan.
2556        sqlite.save_message(cid, "user", "hello").await.unwrap();
2557        sqlite.save_message(cid, "assistant", "hi").await.unwrap();
2558
2559        // Assistant message that does NOT contain a ToolUse.
2560        sqlite
2561            .save_message(cid, "assistant", "plain answer")
2562            .await
2563            .unwrap();
2564
2565        // User message references a tool_use_id that was never sent by the preceding assistant.
2566        let orphan_result_parts = serde_json::to_string(&[MessagePart::ToolResult {
2567            tool_use_id: "call_nonexistent".to_string(),
2568            content: "stale result".to_string(),
2569            is_error: false,
2570        }])
2571        .unwrap();
2572        sqlite
2573            .save_message_with_parts(
2574                cid,
2575                "user",
2576                "[tool_result: call_nonexistent]\nstale result",
2577                &orphan_result_parts,
2578            )
2579            .await
2580            .unwrap();
2581
2582        sqlite
2583            .save_message(cid, "assistant", "final")
2584            .await
2585            .unwrap();
2586
2587        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2588            std::sync::Arc::new(memory),
2589            cid,
2590            50,
2591            5,
2592            100,
2593        );
2594
2595        let messages_before = agent.msg.messages.len();
2596        agent.load_history().await.unwrap();
2597
2598        // The orphaned ToolResult part must have been stripped from the user message.
2599        // The user message itself may be removed (parts empty + content non-empty) or kept with
2600        // the text content — but it must NOT retain the orphaned ToolResult part.
2601        let loaded = &agent.msg.messages[messages_before..];
2602        for msg in loaded {
2603            assert!(
2604                !msg.parts.iter().any(|p| matches!(
2605                    p,
2606                    MessagePart::ToolResult { tool_use_id, .. } if tool_use_id == "call_nonexistent"
2607                )),
2608                "orphaned ToolResult part must be stripped from history"
2609            );
2610        }
2611    }
2612
2613    /// RC2 reverse pass: a complete `tool_use` + `tool_result` pair must pass through the reverse
2614    /// orphan check intact; the fix must not strip valid `ToolResult` parts.
2615    #[tokio::test]
2616    async fn strip_orphans_keeps_complete_pair() {
2617        use zeph_llm::provider::MessagePart;
2618
2619        let provider = mock_provider(vec![]);
2620        let channel = MockChannel::new(vec![]);
2621        let registry = create_test_registry();
2622        let executor = MockToolExecutor::no_tools();
2623
2624        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2625        let cid = memory.sqlite().create_conversation().await.unwrap();
2626        let sqlite = memory.sqlite();
2627
2628        let use_parts = serde_json::to_string(&[MessagePart::ToolUse {
2629            id: "call_valid".to_string(),
2630            name: "shell".to_string(),
2631            input: serde_json::json!({"command": "ls"}),
2632        }])
2633        .unwrap();
2634        sqlite
2635            .save_message_with_parts(cid, "assistant", "[tool_use: shell]", &use_parts)
2636            .await
2637            .unwrap();
2638
2639        let result_parts = serde_json::to_string(&[MessagePart::ToolResult {
2640            tool_use_id: "call_valid".to_string(),
2641            content: "file.rs".to_string(),
2642            is_error: false,
2643        }])
2644        .unwrap();
2645        sqlite
2646            .save_message_with_parts(cid, "user", "", &result_parts)
2647            .await
2648            .unwrap();
2649
2650        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2651            std::sync::Arc::new(memory),
2652            cid,
2653            50,
2654            5,
2655            100,
2656        );
2657
2658        let messages_before = agent.msg.messages.len();
2659        agent.load_history().await.unwrap();
2660
2661        assert_eq!(
2662            agent.msg.messages.len(),
2663            messages_before + 2,
2664            "complete tool_use/tool_result pair must be preserved"
2665        );
2666
2667        let user_msg = &agent.msg.messages[messages_before + 1];
2668        assert!(
2669            user_msg.parts.iter().any(|p| matches!(
2670                p,
2671                MessagePart::ToolResult { tool_use_id, .. } if tool_use_id == "call_valid"
2672            )),
2673            "ToolResult part for a matched tool_use must not be stripped"
2674        );
2675    }
2676
2677    /// RC2 reverse pass: history with a mix of complete pairs and orphaned `ToolResult` messages.
2678    /// Orphaned `ToolResult` parts must be stripped; complete pairs must pass through intact.
2679    #[tokio::test]
2680    async fn strip_orphans_mixed_history() {
2681        use zeph_llm::provider::MessagePart;
2682
2683        let provider = mock_provider(vec![]);
2684        let channel = MockChannel::new(vec![]);
2685        let registry = create_test_registry();
2686        let executor = MockToolExecutor::no_tools();
2687
2688        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2689        let cid = memory.sqlite().create_conversation().await.unwrap();
2690        let sqlite = memory.sqlite();
2691
2692        // First: complete tool_use / tool_result pair.
2693        let use_parts_ok = serde_json::to_string(&[MessagePart::ToolUse {
2694            id: "call_good".to_string(),
2695            name: "shell".to_string(),
2696            input: serde_json::json!({"command": "pwd"}),
2697        }])
2698        .unwrap();
2699        sqlite
2700            .save_message_with_parts(cid, "assistant", "[tool_use: shell]", &use_parts_ok)
2701            .await
2702            .unwrap();
2703
2704        let result_parts_ok = serde_json::to_string(&[MessagePart::ToolResult {
2705            tool_use_id: "call_good".to_string(),
2706            content: "/home".to_string(),
2707            is_error: false,
2708        }])
2709        .unwrap();
2710        sqlite
2711            .save_message_with_parts(cid, "user", "", &result_parts_ok)
2712            .await
2713            .unwrap();
2714
2715        // Second: plain assistant message followed by an orphaned ToolResult user message.
2716        sqlite
2717            .save_message(cid, "assistant", "text only")
2718            .await
2719            .unwrap();
2720
2721        let orphan_parts = serde_json::to_string(&[MessagePart::ToolResult {
2722            tool_use_id: "call_ghost".to_string(),
2723            content: "ghost result".to_string(),
2724            is_error: false,
2725        }])
2726        .unwrap();
2727        sqlite
2728            .save_message_with_parts(
2729                cid,
2730                "user",
2731                "[tool_result: call_ghost]\nghost result",
2732                &orphan_parts,
2733            )
2734            .await
2735            .unwrap();
2736
2737        sqlite
2738            .save_message(cid, "assistant", "final reply")
2739            .await
2740            .unwrap();
2741
2742        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2743            std::sync::Arc::new(memory),
2744            cid,
2745            50,
2746            5,
2747            100,
2748        );
2749
2750        let messages_before = agent.msg.messages.len();
2751        agent.load_history().await.unwrap();
2752
2753        let loaded = &agent.msg.messages[messages_before..];
2754
2755        // The orphaned ToolResult part must not appear in any message.
2756        for msg in loaded {
2757            assert!(
2758                !msg.parts.iter().any(|p| matches!(
2759                    p,
2760                    MessagePart::ToolResult { tool_use_id, .. } if tool_use_id == "call_ghost"
2761                )),
2762                "orphaned ToolResult (call_ghost) must be stripped from history"
2763            );
2764        }
2765
2766        // The matched ToolResult must still be present on the user message following the
2767        // first assistant message.
2768        let has_good_result = loaded.iter().any(|msg| {
2769            msg.role == Role::User
2770                && msg.parts.iter().any(|p| {
2771                    matches!(
2772                        p,
2773                        MessagePart::ToolResult { tool_use_id, .. } if tool_use_id == "call_good"
2774                    )
2775                })
2776        });
2777        assert!(
2778            has_good_result,
2779            "matched ToolResult (call_good) must be preserved in history"
2780        );
2781    }
2782
2783    /// Regression: a properly matched `tool_use`/`tool_result` pair must NOT be touched by the
2784    /// mid-history scan — ensures the fix doesn't break valid tool exchanges.
2785    #[tokio::test]
2786    async fn sanitize_tool_pairs_preserves_matched_tool_pair() {
2787        use zeph_llm::provider::MessagePart;
2788
2789        let provider = mock_provider(vec![]);
2790        let channel = MockChannel::new(vec![]);
2791        let registry = create_test_registry();
2792        let executor = MockToolExecutor::no_tools();
2793
2794        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2795        let cid = memory.sqlite().create_conversation().await.unwrap();
2796        let sqlite = memory.sqlite();
2797
2798        sqlite
2799            .save_message(cid, "user", "run a command")
2800            .await
2801            .unwrap();
2802
2803        // Assistant sends a ToolUse.
2804        let use_parts = serde_json::to_string(&[MessagePart::ToolUse {
2805            id: "call_ok".to_string(),
2806            name: "shell".to_string(),
2807            input: serde_json::json!({"command": "echo hi"}),
2808        }])
2809        .unwrap();
2810        sqlite
2811            .save_message_with_parts(cid, "assistant", "[tool_use: shell]", &use_parts)
2812            .await
2813            .unwrap();
2814
2815        // Matching user ToolResult follows.
2816        let result_parts = serde_json::to_string(&[MessagePart::ToolResult {
2817            tool_use_id: "call_ok".to_string(),
2818            content: "hi".to_string(),
2819            is_error: false,
2820        }])
2821        .unwrap();
2822        sqlite
2823            .save_message_with_parts(cid, "user", "[tool_result: call_ok]\nhi", &result_parts)
2824            .await
2825            .unwrap();
2826
2827        sqlite.save_message(cid, "assistant", "done").await.unwrap();
2828
2829        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2830            std::sync::Arc::new(memory),
2831            cid,
2832            50,
2833            5,
2834            100,
2835        );
2836
2837        let messages_before = agent.msg.messages.len();
2838        agent.load_history().await.unwrap();
2839
2840        // All 4 messages preserved, tool_use parts intact.
2841        assert_eq!(
2842            agent.msg.messages.len(),
2843            messages_before + 4,
2844            "matched tool pair must not be removed"
2845        );
2846        let tool_msg = &agent.msg.messages[messages_before + 1];
2847        assert!(
2848            tool_msg
2849                .parts
2850                .iter()
2851                .any(|p| matches!(p, MessagePart::ToolUse { id, .. } if id == "call_ok")),
2852            "matched ToolUse parts must be preserved"
2853        );
2854    }
2855
2856    /// RC5: `persist_cancelled_tool_results` must persist a tombstone user message containing
2857    /// `is_error=true` `ToolResult` parts for all `tool_calls` IDs so the preceding assistant
2858    /// `ToolUse` is never orphaned in the DB after a cancellation.
2859    #[tokio::test]
2860    async fn persist_cancelled_tool_results_pairs_tool_use() {
2861        use zeph_llm::provider::MessagePart;
2862
2863        let provider = mock_provider(vec![]);
2864        let channel = MockChannel::new(vec![]);
2865        let registry = create_test_registry();
2866        let executor = MockToolExecutor::no_tools();
2867
2868        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2869        let cid = memory.sqlite().create_conversation().await.unwrap();
2870
2871        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2872            std::sync::Arc::new(memory),
2873            cid,
2874            50,
2875            5,
2876            100,
2877        );
2878
2879        // Simulate: assistant message with two ToolUse parts already persisted.
2880        let tool_calls = vec![
2881            zeph_llm::provider::ToolUseRequest {
2882                id: "cancel_id_1".to_string(),
2883                name: "shell".to_string().into(),
2884                input: serde_json::json!({}),
2885            },
2886            zeph_llm::provider::ToolUseRequest {
2887                id: "cancel_id_2".to_string(),
2888                name: "read_file".to_string().into(),
2889                input: serde_json::json!({}),
2890            },
2891        ];
2892
2893        agent.persist_cancelled_tool_results(&tool_calls).await;
2894
2895        let history = agent
2896            .services
2897            .memory
2898            .persistence
2899            .memory
2900            .as_ref()
2901            .unwrap()
2902            .sqlite()
2903            .load_history(cid, 50)
2904            .await
2905            .unwrap();
2906
2907        // Exactly one user message must have been persisted.
2908        assert_eq!(history.len(), 1);
2909        assert_eq!(history[0].role, Role::User);
2910
2911        // It must contain is_error=true ToolResult for each tool call ID.
2912        for tc in &tool_calls {
2913            assert!(
2914                history[0].parts.iter().any(|p| matches!(
2915                    p,
2916                    MessagePart::ToolResult { tool_use_id, is_error, .. }
2917                        if tool_use_id == &tc.id && *is_error
2918                )),
2919                "tombstone ToolResult for {} must be present and is_error=true",
2920                tc.id
2921            );
2922        }
2923    }
2924
2925    // ---- Integration tests for the #2529 fix: soft-delete of legacy-content orphans ----
2926
2927    /// #2529 regression: orphaned assistant `ToolUse` + user `ToolResult` pair where BOTH messages
2928    /// have content consisting solely of legacy tool bracket strings (no human-readable text).
2929    ///
2930    /// Before the fix, `content.trim().is_empty()` returned false for these messages, so they
2931    /// were never soft-deleted and the WARN log repeated on every session restart.
2932    ///
2933    /// After the fix, `has_meaningful_content` returns false for legacy-only content, so both
2934    /// orphaned messages are soft-deleted (non-null `deleted_at`) in `SQLite`.
2935    #[tokio::test]
2936    async fn issue_2529_orphaned_legacy_content_pair_is_soft_deleted() {
2937        use zeph_llm::provider::MessagePart;
2938
2939        let provider = mock_provider(vec![]);
2940        let channel = MockChannel::new(vec![]);
2941        let registry = create_test_registry();
2942        let executor = MockToolExecutor::no_tools();
2943
2944        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2945        let cid = memory.sqlite().create_conversation().await.unwrap();
2946        let sqlite = memory.sqlite();
2947
2948        // A normal user message that anchors the conversation.
2949        sqlite
2950            .save_message(cid, "user", "save this for me")
2951            .await
2952            .unwrap();
2953
2954        // Orphaned assistant[ToolUse]: content is ONLY a legacy tool tag — no matching
2955        // ToolResult follows. This is the exact pattern that triggered #2529.
2956        let use_parts = serde_json::to_string(&[MessagePart::ToolUse {
2957            id: "call_2529".to_string(),
2958            name: "memory_save".to_string(),
2959            input: serde_json::json!({"content": "save this"}),
2960        }])
2961        .unwrap();
2962        let orphan_assistant_id = sqlite
2963            .save_message_with_parts(
2964                cid,
2965                "assistant",
2966                "[tool_use: memory_save(call_2529)]",
2967                &use_parts,
2968            )
2969            .await
2970            .unwrap();
2971
2972        // Orphaned user[ToolResult]: content is ONLY a legacy tool tag + body.
2973        // Its tool_use_id ("call_2529") does not match any ToolUse in the preceding assistant
2974        // message in this position (will be made orphaned by inserting a plain assistant message
2975        // between them to break the pair).
2976        sqlite
2977            .save_message(cid, "assistant", "here is a plain reply")
2978            .await
2979            .unwrap();
2980
2981        let result_parts = serde_json::to_string(&[MessagePart::ToolResult {
2982            tool_use_id: "call_2529".to_string(),
2983            content: "saved".to_string(),
2984            is_error: false,
2985        }])
2986        .unwrap();
2987        let orphan_user_id = sqlite
2988            .save_message_with_parts(
2989                cid,
2990                "user",
2991                "[tool_result: call_2529]\nsaved",
2992                &result_parts,
2993            )
2994            .await
2995            .unwrap();
2996
2997        // Final plain message so history doesn't end on the orphan.
2998        sqlite.save_message(cid, "assistant", "done").await.unwrap();
2999
3000        let memory_arc = std::sync::Arc::new(memory);
3001        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
3002            memory_arc.clone(),
3003            cid,
3004            50,
3005            5,
3006            100,
3007        );
3008
3009        agent.load_history().await.unwrap();
3010
3011        // Verify that both orphaned messages now have `deleted_at IS NOT NULL` in SQLite.
3012        // COUNT(*) WHERE deleted_at IS NOT NULL returns 1 if soft-deleted, 0 otherwise.
3013        let assistant_deleted_count: Vec<i64> = zeph_db::query_scalar(
3014            "SELECT COUNT(*) FROM messages WHERE id = ? AND deleted_at IS NOT NULL",
3015        )
3016        .bind(orphan_assistant_id)
3017        .fetch_all(memory_arc.sqlite().pool())
3018        .await
3019        .unwrap();
3020
3021        let user_deleted_count: Vec<i64> = zeph_db::query_scalar(
3022            "SELECT COUNT(*) FROM messages WHERE id = ? AND deleted_at IS NOT NULL",
3023        )
3024        .bind(orphan_user_id)
3025        .fetch_all(memory_arc.sqlite().pool())
3026        .await
3027        .unwrap();
3028
3029        assert_eq!(
3030            assistant_deleted_count.first().copied().unwrap_or(0),
3031            1,
3032            "orphaned assistant[ToolUse] with legacy-only content must be soft-deleted (deleted_at IS NOT NULL)"
3033        );
3034        assert_eq!(
3035            user_deleted_count.first().copied().unwrap_or(0),
3036            1,
3037            "orphaned user[ToolResult] with legacy-only content must be soft-deleted (deleted_at IS NOT NULL)"
3038        );
3039    }
3040
3041    /// #2529 idempotency: after soft-delete on first `load_history`, a second call must not
3042    /// re-load the soft-deleted orphans. This ensures the WARN log does not repeat on the
3043    /// next session startup for the same orphaned messages.
3044    #[tokio::test]
3045    async fn issue_2529_soft_delete_is_idempotent_across_sessions() {
3046        use zeph_llm::provider::MessagePart;
3047
3048        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
3049        let cid = memory.sqlite().create_conversation().await.unwrap();
3050        let sqlite = memory.sqlite();
3051
3052        // Normal anchor message.
3053        sqlite
3054            .save_message(cid, "user", "do something")
3055            .await
3056            .unwrap();
3057
3058        // Orphaned assistant[ToolUse] with legacy-only content.
3059        let use_parts = serde_json::to_string(&[MessagePart::ToolUse {
3060            id: "call_idem".to_string(),
3061            name: "shell".to_string(),
3062            input: serde_json::json!({"command": "ls"}),
3063        }])
3064        .unwrap();
3065        sqlite
3066            .save_message_with_parts(cid, "assistant", "[tool_use: shell(call_idem)]", &use_parts)
3067            .await
3068            .unwrap();
3069
3070        // Break the pair: insert a plain assistant message so the ToolUse is mid-history orphan.
3071        sqlite
3072            .save_message(cid, "assistant", "continuing")
3073            .await
3074            .unwrap();
3075
3076        // Orphaned user[ToolResult] with legacy-only content.
3077        let result_parts = serde_json::to_string(&[MessagePart::ToolResult {
3078            tool_use_id: "call_idem".to_string(),
3079            content: "output".to_string(),
3080            is_error: false,
3081        }])
3082        .unwrap();
3083        sqlite
3084            .save_message_with_parts(
3085                cid,
3086                "user",
3087                "[tool_result: call_idem]\noutput",
3088                &result_parts,
3089            )
3090            .await
3091            .unwrap();
3092
3093        sqlite
3094            .save_message(cid, "assistant", "final")
3095            .await
3096            .unwrap();
3097
3098        let memory_arc = std::sync::Arc::new(memory);
3099
3100        // First session: load_history performs soft-delete of the orphaned pair.
3101        let mut agent1 = Agent::new(
3102            mock_provider(vec![]),
3103            MockChannel::new(vec![]),
3104            create_test_registry(),
3105            None,
3106            5,
3107            MockToolExecutor::no_tools(),
3108        )
3109        .with_memory(memory_arc.clone(), cid, 50, 5, 100);
3110        agent1.load_history().await.unwrap();
3111        let count_after_first = agent1.msg.messages.len();
3112
3113        // Second session: a fresh agent loading the same conversation must not see the
3114        // soft-deleted orphans — the WARN log must not repeat.
3115        let mut agent2 = Agent::new(
3116            mock_provider(vec![]),
3117            MockChannel::new(vec![]),
3118            create_test_registry(),
3119            None,
3120            5,
3121            MockToolExecutor::no_tools(),
3122        )
3123        .with_memory(memory_arc.clone(), cid, 50, 5, 100);
3124        agent2.load_history().await.unwrap();
3125        let count_after_second = agent2.msg.messages.len();
3126
3127        // Both sessions must load the same number of messages — soft-deleted orphans excluded.
3128        assert_eq!(
3129            count_after_first, count_after_second,
3130            "second load_history must load the same message count as the first (soft-deleted orphans excluded)"
3131        );
3132    }
3133
3134    /// Edge case for #2529: an orphaned assistant message whose content has BOTH meaningful text
3135    /// AND a `tool_use` tag must NOT be soft-deleted. The `ToolUse` parts are stripped but the
3136    /// message is kept because it has human-readable content.
3137    #[tokio::test]
3138    async fn issue_2529_message_with_text_and_tool_tag_is_kept_after_part_strip() {
3139        use zeph_llm::provider::MessagePart;
3140
3141        let provider = mock_provider(vec![]);
3142        let channel = MockChannel::new(vec![]);
3143        let registry = create_test_registry();
3144        let executor = MockToolExecutor::no_tools();
3145
3146        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
3147        let cid = memory.sqlite().create_conversation().await.unwrap();
3148        let sqlite = memory.sqlite();
3149
3150        // Normal first exchange.
3151        sqlite
3152            .save_message(cid, "user", "check the files")
3153            .await
3154            .unwrap();
3155
3156        // Assistant message: has BOTH meaningful text AND a ToolUse part.
3157        // Content contains real prose + legacy tag — has_meaningful_content must return true.
3158        let use_parts = serde_json::to_string(&[MessagePart::ToolUse {
3159            id: "call_mixed".to_string(),
3160            name: "shell".to_string(),
3161            input: serde_json::json!({"command": "ls"}),
3162        }])
3163        .unwrap();
3164        sqlite
3165            .save_message_with_parts(
3166                cid,
3167                "assistant",
3168                "Let me list the directory. [tool_use: shell(call_mixed)]",
3169                &use_parts,
3170            )
3171            .await
3172            .unwrap();
3173
3174        // No matching ToolResult follows — the ToolUse is orphaned.
3175        sqlite.save_message(cid, "user", "thanks").await.unwrap();
3176        sqlite
3177            .save_message(cid, "assistant", "you are welcome")
3178            .await
3179            .unwrap();
3180
3181        let memory_arc = std::sync::Arc::new(memory);
3182        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
3183            memory_arc.clone(),
3184            cid,
3185            50,
3186            5,
3187            100,
3188        );
3189
3190        let messages_before = agent.msg.messages.len();
3191        agent.load_history().await.unwrap();
3192
3193        // All 4 messages must be present — the mixed-content assistant message is KEPT.
3194        assert_eq!(
3195            agent.msg.messages.len(),
3196            messages_before + 4,
3197            "assistant message with text + tool tag must not be removed after ToolUse strip"
3198        );
3199
3200        // The mixed-content assistant message must have its ToolUse parts stripped.
3201        let mixed_msg = agent
3202            .msg
3203            .messages
3204            .iter()
3205            .find(|m| m.content.contains("Let me list the directory"))
3206            .expect("mixed-content assistant message must still be in history");
3207        assert!(
3208            !mixed_msg
3209                .parts
3210                .iter()
3211                .any(|p| matches!(p, MessagePart::ToolUse { .. })),
3212            "orphaned ToolUse parts must be stripped even when message has meaningful text"
3213        );
3214        assert_eq!(
3215            mixed_msg.content, "Let me list the directory. [tool_use: shell(call_mixed)]",
3216            "content field must be unchanged — only parts are stripped"
3217        );
3218    }
3219
3220    // ── [skipped]/[stopped] ToolResult embedding guard (#2620) ──────────────
3221
3222    #[tokio::test]
3223    async fn persist_message_skipped_tool_result_does_not_embed() {
3224        use zeph_llm::provider::MessagePart;
3225
3226        let provider = mock_provider(vec![]);
3227        let channel = MockChannel::new(vec![]);
3228        let registry = create_test_registry();
3229        let executor = MockToolExecutor::no_tools();
3230
3231        let (tx, rx) = tokio::sync::watch::channel(MetricsSnapshot::default());
3232        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
3233        let cid = memory.sqlite().create_conversation().await.unwrap();
3234
3235        let mut agent = Agent::new(provider, channel, registry, None, 5, executor)
3236            .with_metrics(tx)
3237            .with_memory(std::sync::Arc::new(memory), cid, 50, 5, 100);
3238        agent.services.memory.persistence.autosave_assistant = true;
3239        agent.services.memory.persistence.autosave_min_length = 0;
3240
3241        let parts = vec![MessagePart::ToolResult {
3242            tool_use_id: "tu1".into(),
3243            content: "[skipped] bash tool was blocked by utility gate".into(),
3244            is_error: false,
3245        }];
3246
3247        agent
3248            .persist_message(
3249                Role::User,
3250                "[skipped] bash tool was blocked by utility gate",
3251                &parts,
3252                false,
3253            )
3254            .await;
3255
3256        assert_eq!(
3257            rx.borrow().embeddings_generated,
3258            0,
3259            "[skipped] ToolResult must not be embedded into Qdrant"
3260        );
3261    }
3262
3263    #[tokio::test]
3264    async fn persist_message_stopped_tool_result_does_not_embed() {
3265        use zeph_llm::provider::MessagePart;
3266
3267        let provider = mock_provider(vec![]);
3268        let channel = MockChannel::new(vec![]);
3269        let registry = create_test_registry();
3270        let executor = MockToolExecutor::no_tools();
3271
3272        let (tx, rx) = tokio::sync::watch::channel(MetricsSnapshot::default());
3273        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
3274        let cid = memory.sqlite().create_conversation().await.unwrap();
3275
3276        let mut agent = Agent::new(provider, channel, registry, None, 5, executor)
3277            .with_metrics(tx)
3278            .with_memory(std::sync::Arc::new(memory), cid, 50, 5, 100);
3279        agent.services.memory.persistence.autosave_assistant = true;
3280        agent.services.memory.persistence.autosave_min_length = 0;
3281
3282        let parts = vec![MessagePart::ToolResult {
3283            tool_use_id: "tu2".into(),
3284            content: "[stopped] execution limit reached".into(),
3285            is_error: false,
3286        }];
3287
3288        agent
3289            .persist_message(
3290                Role::User,
3291                "[stopped] execution limit reached",
3292                &parts,
3293                false,
3294            )
3295            .await;
3296
3297        assert_eq!(
3298            rx.borrow().embeddings_generated,
3299            0,
3300            "[stopped] ToolResult must not be embedded into Qdrant"
3301        );
3302    }
3303
3304    #[tokio::test]
3305    async fn persist_message_normal_tool_result_is_saved_not_blocked_by_guard() {
3306        // Regression: a normal ToolResult (no [skipped]/[stopped] prefix) must not be
3307        // suppressed by the utility-gate guard and must reach the save path (SQLite).
3308        use zeph_llm::provider::MessagePart;
3309
3310        let provider = mock_provider(vec![]);
3311        let channel = MockChannel::new(vec![]);
3312        let registry = create_test_registry();
3313        let executor = MockToolExecutor::no_tools();
3314
3315        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
3316        let cid = memory.sqlite().create_conversation().await.unwrap();
3317        let memory_arc = std::sync::Arc::new(memory);
3318
3319        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
3320            memory_arc.clone(),
3321            cid,
3322            50,
3323            5,
3324            100,
3325        );
3326        agent.services.memory.persistence.autosave_assistant = true;
3327        agent.services.memory.persistence.autosave_min_length = 0;
3328
3329        let content = "total 42\ndrwxr-xr-x  5 user group";
3330        let parts = vec![MessagePart::ToolResult {
3331            tool_use_id: "tu3".into(),
3332            content: content.into(),
3333            is_error: false,
3334        }];
3335
3336        agent
3337            .persist_message(Role::User, content, &parts, false)
3338            .await;
3339
3340        // Must be saved to SQLite — confirms the guard did not block this path.
3341        let history = memory_arc.sqlite().load_history(cid, 50).await.unwrap();
3342        assert_eq!(
3343            history.len(),
3344            1,
3345            "normal ToolResult must be saved to SQLite"
3346        );
3347        assert_eq!(history[0].content, content);
3348    }
3349
3350    /// Verify that `enqueue_trajectory_extraction_task` uses a bounded tail slice instead of
3351    /// cloning the full message vec. We confirm the slice logic by checking that the
3352    /// `tail_start` calculation correctly bounds the window even with more messages than
3353    /// `max_messages`.
3354    #[test]
3355    fn trajectory_extraction_slice_bounds_messages() {
3356        // Replicate the slice logic from enqueue_trajectory_extraction_task.
3357        let max_messages: usize = 20;
3358        let total_messages = 100usize;
3359
3360        let tail_start = total_messages.saturating_sub(max_messages);
3361        let window = total_messages - tail_start;
3362
3363        assert_eq!(
3364            window, 20,
3365            "slice should contain exactly max_messages items"
3366        );
3367        assert_eq!(tail_start, 80, "slice should start at len - max_messages");
3368    }
3369
3370    #[test]
3371    fn trajectory_extraction_slice_handles_few_messages() {
3372        let max_messages: usize = 20;
3373        let total_messages = 5usize;
3374
3375        let tail_start = total_messages.saturating_sub(max_messages);
3376        let window = total_messages - tail_start;
3377
3378        assert_eq!(window, 5, "should return all messages when fewer than max");
3379        assert_eq!(tail_start, 0, "slice should start from the beginning");
3380    }
3381
3382    // --- #3168 regression tests ---
3383
3384    /// Round-trip: persist `Assistant[tool_use]` + `User[tool_result]`, then `load_history`.
3385    /// After `sanitize_tool_pairs` the pair must be intact — no WARN, both messages present
3386    /// with non-empty parts.
3387    #[tokio::test]
3388    async fn regression_3168_complete_tool_pair_survives_round_trip() {
3389        use zeph_llm::provider::MessagePart;
3390
3391        let provider = mock_provider(vec![]);
3392        let channel = MockChannel::new(vec![]);
3393        let registry = create_test_registry();
3394        let executor = MockToolExecutor::no_tools();
3395
3396        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
3397        let cid = memory.sqlite().create_conversation().await.unwrap();
3398        let sqlite = memory.sqlite();
3399
3400        let use_parts = serde_json::to_string(&[MessagePart::ToolUse {
3401            id: "r3168_call".to_string(),
3402            name: "shell".to_string(),
3403            input: serde_json::json!({"command": "echo hi"}),
3404        }])
3405        .unwrap();
3406        sqlite
3407            .save_message_with_parts(
3408                cid,
3409                "assistant",
3410                "[tool_use: shell(r3168_call)]",
3411                &use_parts,
3412            )
3413            .await
3414            .unwrap();
3415
3416        let result_parts = serde_json::to_string(&[MessagePart::ToolResult {
3417            tool_use_id: "r3168_call".to_string(),
3418            content: "[skipped]".to_string(),
3419            is_error: false,
3420        }])
3421        .unwrap();
3422        sqlite
3423            .save_message_with_parts(cid, "user", "[tool_result: r3168_call]", &result_parts)
3424            .await
3425            .unwrap();
3426
3427        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
3428            std::sync::Arc::new(memory),
3429            cid,
3430            50,
3431            5,
3432            100,
3433        );
3434
3435        let base = agent.msg.messages.len();
3436        agent.load_history().await.unwrap();
3437
3438        assert_eq!(
3439            agent.msg.messages.len(),
3440            base + 2,
3441            "both messages of the complete pair must survive load_history"
3442        );
3443
3444        let assistant_msg = agent
3445            .msg
3446            .messages
3447            .iter()
3448            .find(|m| m.role == Role::Assistant)
3449            .expect("assistant message missing after load_history");
3450        assert!(
3451            assistant_msg
3452                .parts
3453                .iter()
3454                .any(|p| matches!(p, MessagePart::ToolUse { id, .. } if id == "r3168_call")),
3455            "ToolUse part must be preserved in assistant message"
3456        );
3457
3458        let user_msg = agent
3459            .msg
3460            .messages
3461            .iter()
3462            .rev()
3463            .find(|m| m.role == Role::User)
3464            .expect("user message missing after load_history");
3465        assert!(
3466            user_msg.parts.iter().any(|p| matches!(p, MessagePart::ToolResult { tool_use_id, .. } if tool_use_id == "r3168_call")),
3467            "ToolResult part must be preserved in user message"
3468        );
3469    }
3470
3471    /// If parts serialization fails, `persist_message` must return early and not store
3472    /// a row with empty parts that would create an orphaned `tool_use` on next session load.
3473    /// We verify this by writing a row with invalid `parts_json` directly and confirming
3474    /// `load_history` skips it (empty parts row is not injected into the agent).
3475    #[tokio::test]
3476    async fn regression_3168_corrupt_parts_row_skipped_on_load() {
3477        use zeph_llm::provider::MessagePart;
3478
3479        let provider = mock_provider(vec![]);
3480        let channel = MockChannel::new(vec![]);
3481        let registry = create_test_registry();
3482        let executor = MockToolExecutor::no_tools();
3483
3484        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
3485        let cid = memory.sqlite().create_conversation().await.unwrap();
3486        let sqlite = memory.sqlite();
3487
3488        // Simulate the pre-fix bug: assistant message stored with empty parts_json "[]"
3489        // even though it should have had a ToolUse part. This is what persist_message used
3490        // to do before the early-return fix.
3491        sqlite
3492            .save_message_with_parts(cid, "assistant", "[tool_use: shell(corrupt)]", "[]")
3493            .await
3494            .unwrap();
3495
3496        // User message with the matching tool_result stored correctly.
3497        let result_parts = serde_json::to_string(&[MessagePart::ToolResult {
3498            tool_use_id: "corrupt".to_string(),
3499            content: "result".to_string(),
3500            is_error: false,
3501        }])
3502        .unwrap();
3503        sqlite
3504            .save_message_with_parts(cid, "user", "[tool_result: corrupt]", &result_parts)
3505            .await
3506            .unwrap();
3507
3508        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
3509            std::sync::Arc::new(memory),
3510            cid,
3511            50,
3512            5,
3513            100,
3514        );
3515
3516        let base = agent.msg.messages.len();
3517        agent.load_history().await.unwrap();
3518
3519        // The user ToolResult has no matching ToolUse in the assistant message (parts="[]"),
3520        // so sanitize_tool_pairs must strip the orphaned ToolResult.
3521        // Net result: only the content-only assistant message survives; user msg is removed
3522        // because after stripping its only part it becomes empty.
3523        let loaded = agent.msg.messages.len() - base;
3524        // No message injected with orphaned ToolResult parts — either stripped entirely or
3525        // the ToolResult part removed. Verify no user message with ToolResult remains.
3526        let orphan_present = agent.msg.messages.iter().any(|m| {
3527            m.role == Role::User
3528                && m.parts.iter().any(|p| {
3529                    matches!(p, MessagePart::ToolResult { tool_use_id, .. } if tool_use_id == "corrupt")
3530                })
3531        });
3532        assert!(
3533            !orphan_present,
3534            "orphaned ToolResult must not survive load_history; loaded={loaded}"
3535        );
3536    }
3537}