Skip to main content

zeph_core/agent/
persistence.rs

1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4use crate::channel::Channel;
5use zeph_agent_persistence::graph::{build_graph_extraction_config, collect_context_messages};
6use zeph_agent_persistence::{
7    LoadHistoryParams, MemoryPersistenceView, MetricsView, PersistMessageRequest,
8    PersistenceService, SecurityView,
9};
10use zeph_llm::provider::{LlmProvider as _, MessagePart, Role};
11
12use super::Agent;
13
14impl<C: Channel> Agent<C> {
15    /// Load conversation history from memory and inject into messages.
16    ///
17    /// Delegates to [`PersistenceService::load_history`]. Post-load operations that touch
18    /// agent-internal singletons (session count increment, semantic fact count recompute,
19    /// token recompute) remain in this shim because they access fields outside the
20    /// borrow-lens view.
21    ///
22    /// # Errors
23    ///
24    /// Returns an error if loading history from `SQLite` fails.
25    ///
26    /// # Panics
27    ///
28    /// Does not panic. The internal `unwrap_or(0)` conversions are on fallible `i64 → usize`
29    /// casts that saturate to zero on overflow; they cannot panic.
30    #[tracing::instrument(name = "core.persist.load_history", skip_all, level = "debug", err)]
31    pub async fn load_history(&mut self) -> Result<(), super::error::AgentError> {
32        let (Some(memory), Some(cid)) = (
33            self.services.memory.persistence.memory.as_ref(),
34            self.services.memory.persistence.conversation_id,
35        ) else {
36            return Ok(());
37        };
38
39        // Clone so we can call methods after the borrow-lens view is dropped.
40        let memory = memory.clone();
41
42        let mut unsummarized = self.services.memory.persistence.unsummarized_count;
43        // `memory_view` is not `mut` — the `&mut unsummarized` inside is established at
44        // construction and passed as `&memory_view` to load_history (shared borrow).
45        let memory_view = MemoryPersistenceView {
46            memory: Some(&memory),
47            conversation_id: self.services.memory.persistence.conversation_id,
48            autosave_assistant: self.services.memory.persistence.autosave_assistant,
49            autosave_min_length: self.services.memory.persistence.autosave_min_length,
50            unsummarized_count: &mut unsummarized,
51            goal_text: self.services.memory.extraction.goal_text.clone(),
52        };
53
54        let svc = PersistenceService::new();
55        let outcome = svc
56            .load_history(LoadHistoryParams {
57                messages: &mut self.msg.messages,
58                last_persisted_message_id: &mut self.msg.last_persisted_message_id,
59                deferred_hide_ids: &mut self.msg.deferred_db_hide_ids,
60                memory_view: &memory_view,
61            })
62            .await
63            .map_err(|e| {
64                super::error::AgentError::Memory(zeph_memory::MemoryError::Other(e.to_string()))
65            })?;
66
67        // Write back lens-borrowed local to the field.
68        self.services.memory.persistence.unsummarized_count = unsummarized;
69
70        if outcome.messages_loaded > 0 {
71            // Increment session counts so tier promotion can track cross-session access.
72            let _ = memory
73                .sqlite()
74                .increment_session_counts_for_conversation(cid)
75                .await
76                .inspect_err(|e| {
77                    tracing::warn!(error = %e, "failed to increment tier session counts");
78                });
79        }
80
81        // Set absolute SQLite message count and semantic fact count (not deltas).
82        self.update_metrics(|m| {
83            m.sqlite_message_count = outcome.sqlite_total_messages;
84        });
85        if let Ok(count) = memory.sqlite().count_semantic_facts().await {
86            let count_u64 = u64::try_from(count).unwrap_or(0);
87            self.update_metrics(|m| {
88                m.semantic_fact_count = count_u64;
89            });
90        }
91        if let Ok(count) = memory.unsummarized_message_count(cid).await {
92            self.services.memory.persistence.unsummarized_count =
93                usize::try_from(count).unwrap_or(0);
94        }
95
96        self.recompute_prompt_tokens();
97        Ok(())
98    }
99
100    /// Persist a message to memory.
101    ///
102    /// `has_injection_flags` controls whether Qdrant embedding is skipped for this message.
103    /// When `true` and `guard_memory_writes` is enabled, only `SQLite` is written — the message
104    /// is saved for conversation continuity but will not pollute semantic search (M2, D2).
105    #[tracing::instrument(name = "core.persist.persist_message", skip_all, level = "debug")]
106    pub(crate) async fn persist_message(
107        &mut self,
108        role: Role,
109        content: &str,
110        parts: &[MessagePart],
111        has_injection_flags: bool,
112    ) {
113        // M2: call should_guard_memory_write for its diagnostic side effects (tracing + security
114        // event). The bool result is passed into SecurityView so the service can decide whether
115        // to skip Qdrant embedding.
116        let guard_event = self
117            .services
118            .security
119            .exfiltration_guard
120            .should_guard_memory_write(has_injection_flags);
121        if let Some(ref event) = guard_event {
122            tracing::warn!(
123                ?event,
124                "exfiltration guard: skipping Qdrant embedding for flagged content"
125            );
126            self.push_security_event(
127                zeph_common::SecurityEventCategory::ExfiltrationBlock,
128                "memory_write",
129                "Qdrant embedding skipped: flagged content",
130            );
131        }
132
133        let req = PersistMessageRequest::from_borrowed(role, content, parts, has_injection_flags);
134
135        let mut unsummarized = self.services.memory.persistence.unsummarized_count;
136        let memory_arc = self.services.memory.persistence.memory.clone();
137        let mut memory_view = MemoryPersistenceView {
138            memory: memory_arc.as_ref(),
139            conversation_id: self.services.memory.persistence.conversation_id,
140            autosave_assistant: self.services.memory.persistence.autosave_assistant,
141            autosave_min_length: self.services.memory.persistence.autosave_min_length,
142            unsummarized_count: &mut unsummarized,
143            goal_text: self.services.memory.extraction.goal_text.clone(),
144        };
145        let security = SecurityView {
146            guard_memory_writes: guard_event.is_some(),
147            _phantom: std::marker::PhantomData,
148        };
149        let mut sqlite_delta = 0u64;
150        let mut embed_delta = 0u64;
151        let mut guard_delta = 0u64;
152        let mut metrics_view = MetricsView {
153            sqlite_message_count: &mut sqlite_delta,
154            embeddings_generated: &mut embed_delta,
155            exfiltration_memory_guards: &mut guard_delta,
156        };
157
158        let svc = PersistenceService::new();
159        let outcome = svc
160            .persist_message(
161                req,
162                &mut self.msg.last_persisted_message_id,
163                &mut memory_view,
164                &security,
165                &mut metrics_view,
166            )
167            .await;
168
169        // Write back the unsummarized counter (lens borrowed a local copy).
170        self.services.memory.persistence.unsummarized_count = unsummarized;
171
172        // Forward metric deltas through the watch broadcast.
173        self.update_metrics(|m| {
174            m.sqlite_message_count += sqlite_delta;
175            m.embeddings_generated += embed_delta;
176            // guard_delta is already tracked via push_security_event above.
177            m.exfiltration_memory_guards += guard_delta;
178        });
179
180        if outcome.message_id.is_none() {
181            return;
182        }
183
184        // Phase 2: enqueue enrichment tasks via supervisor (non-blocking).
185        // check_summarization signals completion via SummarizationSignal, consumed in reap()
186        // between turns — no shared mutable state across tasks (S1 fix).
187        self.enqueue_summarization_task();
188
189        // FIX-1: skip graph extraction for tool result messages — they contain raw structured
190        // output (TOML, JSON, code) that pollutes the entity graph with noise.
191        let has_tool_result_parts = parts
192            .iter()
193            .any(|p| matches!(p, MessagePart::ToolResult { .. }));
194
195        self.enqueue_graph_extraction_task(content, has_injection_flags, has_tool_result_parts)
196            .await;
197
198        // Persona extraction: run only for user messages that are not tool results and not injected.
199        if role == Role::User && !has_tool_result_parts && !has_injection_flags {
200            self.enqueue_persona_extraction_task();
201        }
202
203        // Trajectory extraction: run after turns that contained tool results.
204        if has_tool_result_parts {
205            self.enqueue_trajectory_extraction_task();
206        }
207
208        // ReasoningBank distillation: runs only after the final assistant message of a turn
209        // (C2 fix: skip intermediate tool-call messages). A message with ToolUse parts is an
210        // intermediate step; the final assistant message has no ToolUse parts.
211        // S-Med1: skip if injection patterns detected — mirrors graph extraction guard.
212        let has_tool_use_parts = parts
213            .iter()
214            .any(|p| matches!(p, MessagePart::ToolUse { .. }));
215        if role == Role::Assistant && !has_tool_use_parts && !has_injection_flags {
216            self.enqueue_reasoning_extraction_task();
217            // MemCoT distillation: same guards as ReasoningBank.
218            self.enqueue_memcot_distill_task(content);
219        }
220    }
221
222    /// Enqueue `MemCoT` semantic state distillation via the supervisor.
223    ///
224    /// All cost gates (interval, session cap, min chars) are checked inside
225    /// [`crate::agent::memcot::SemanticStateAccumulator::maybe_enqueue_distill`].
226    fn enqueue_memcot_distill_task(&mut self, assistant_content: &str) {
227        let Some(accumulator) = &self.services.memory.extraction.memcot_accumulator else {
228            return;
229        };
230        let distill_provider_name = self
231            .services
232            .memory
233            .extraction
234            .memcot_config
235            .distill_provider
236            .as_str();
237        let provider = self.resolve_background_provider(distill_provider_name);
238
239        let content = assistant_content.to_owned();
240        let supervisor = &mut self.runtime.lifecycle.supervisor;
241
242        accumulator.maybe_enqueue_distill(&content, provider, |name, fut| {
243            supervisor.spawn(super::agent_supervisor::TaskClass::Enrichment, name, fut);
244        });
245    }
246
247    /// Enqueue background summarization via the supervisor (S1 fix: no shared `AtomicUsize`).
248    fn enqueue_summarization_task(&mut self) {
249        let (Some(memory), Some(cid)) = (
250            self.services.memory.persistence.memory.clone(),
251            self.services.memory.persistence.conversation_id,
252        ) else {
253            return;
254        };
255
256        if self.services.memory.persistence.unsummarized_count
257            <= self.services.memory.compaction.summarization_threshold
258        {
259            return;
260        }
261
262        let batch_size = self.services.memory.compaction.summarization_threshold / 2;
263
264        self.runtime.lifecycle.supervisor.spawn_summarization("summarization", async move {
265            match tokio::time::timeout(
266                std::time::Duration::from_secs(30),
267                memory.summarize(cid, batch_size),
268            )
269            .await
270            {
271                Ok(Ok(Some(summary_id))) => {
272                    tracing::info!(
273                        "background summarization: created summary {summary_id} for conversation {cid}"
274                    );
275                    true
276                }
277                Ok(Ok(None)) => {
278                    tracing::debug!("background summarization: no summarization needed");
279                    false
280                }
281                Ok(Err(e)) => {
282                    tracing::error!("background summarization failed: {e:#}");
283                    false
284                }
285                Err(_) => {
286                    tracing::warn!("background summarization timed out after 30s");
287                    false
288                }
289            }
290        });
291    }
292
293    /// Prepare graph extraction guards in foreground, then enqueue heavy work via supervisor.
294    ///
295    /// Guards (enabled check, injection/tool-result skip) stay on the foreground path.
296    /// The RPE check and actual extraction run in background (S2: no `send_status`).
297    #[tracing::instrument(
298        name = "core.persist.enqueue_graph_extraction",
299        skip_all,
300        level = "debug"
301    )]
302    async fn enqueue_graph_extraction_task(
303        &mut self,
304        content: &str,
305        has_injection_flags: bool,
306        has_tool_result_parts: bool,
307    ) {
308        if self.services.memory.persistence.memory.is_none()
309            || self.services.memory.persistence.conversation_id.is_none()
310        {
311            return;
312        }
313        if has_tool_result_parts {
314            tracing::debug!("graph extraction skipped: message contains ToolResult parts");
315            return;
316        }
317        if has_injection_flags {
318            tracing::warn!("graph extraction skipped: injection patterns detected in content");
319            return;
320        }
321
322        let cfg = &self.services.memory.extraction.graph_config;
323        if !cfg.enabled {
324            return;
325        }
326        let extraction_cfg = build_graph_extraction_config(
327            cfg,
328            self.services
329                .memory
330                .persistence
331                .conversation_id
332                .map(|c| c.0),
333        );
334        // Resolve a clean provider that bypasses quality_gate for JSON extraction tasks.
335        // When extract_provider is empty, falls back to the primary provider (existing behavior).
336        let extract_provider_name = cfg.extract_provider.as_str().to_owned();
337
338        // RPE check: embed + compute surprise score. Stays on foreground to avoid
339        // capturing the rpe_router mutex in a background task.
340        if self.rpe_should_skip(content).await {
341            tracing::debug!("D-MEM RPE: low-surprise turn, skipping graph extraction");
342            return;
343        }
344
345        let context_messages = collect_context_messages(&self.msg.messages);
346
347        let Some(memory) = self.services.memory.persistence.memory.clone() else {
348            return;
349        };
350
351        let validator: zeph_memory::semantic::PostExtractValidator =
352            if self.services.security.memory_validator.is_enabled() {
353                let v = self.services.security.memory_validator.clone();
354                Some(Box::new(move |result| {
355                    v.validate_graph_extraction(result)
356                        .map_err(|e| e.to_string())
357                }))
358            } else {
359                None
360            };
361
362        let provider_override = if extract_provider_name.is_empty() {
363            None
364        } else {
365            Some(self.resolve_background_provider(&extract_provider_name))
366        };
367
368        self.spawn_graph_extraction_task(
369            memory,
370            content,
371            context_messages,
372            extraction_cfg,
373            validator,
374            provider_override,
375        );
376
377        // Sync community failures and extraction metrics (cheap, foreground-safe).
378        self.sync_community_detection_failures();
379        self.sync_graph_extraction_metrics();
380        self.enqueue_graph_count_sync_task();
381    }
382
383    fn spawn_graph_extraction_task(
384        &mut self,
385        memory: std::sync::Arc<zeph_memory::semantic::SemanticMemory>,
386        content: &str,
387        context_messages: Vec<String>,
388        extraction_cfg: zeph_memory::semantic::GraphExtractionConfig,
389        validator: zeph_memory::semantic::PostExtractValidator,
390        provider_override: Option<zeph_llm::any::AnyProvider>,
391    ) {
392        let content_owned = content.to_owned();
393        let graph_store = memory.graph_store.clone();
394        let metrics_tx = self.runtime.metrics.metrics_tx.clone();
395        let start_time = self.runtime.lifecycle.start_time;
396
397        self.runtime.lifecycle.supervisor.spawn(
398            super::agent_supervisor::TaskClass::Enrichment,
399            "graph_extraction",
400            async move {
401                let extraction_handle = memory.spawn_graph_extraction(
402                    content_owned,
403                    context_messages,
404                    extraction_cfg,
405                    validator,
406                    provider_override,
407                );
408
409                // After extraction completes, refresh graph count metrics.
410                if let (Some(store), Some(tx)) = (graph_store, metrics_tx) {
411                    let _ = extraction_handle.await;
412                    let (entities, edges, communities) = tokio::join!(
413                        store.entity_count(),
414                        store.active_edge_count(),
415                        store.community_count()
416                    );
417                    let elapsed = start_time.elapsed().as_secs();
418                    tx.send_modify(|m| {
419                        m.uptime_seconds = elapsed;
420                        m.graph_entities_total = entities.unwrap_or(0).cast_unsigned();
421                        m.graph_edges_total = edges.unwrap_or(0).cast_unsigned();
422                        m.graph_communities_total = communities.unwrap_or(0).cast_unsigned();
423                    });
424                } else {
425                    let _ = extraction_handle.await;
426                }
427
428                tracing::debug!("background graph extraction complete");
429            },
430        );
431    }
432
433    // sync_graph_counts and sync_guidelines_status are DB reads; enqueued as Telemetry background.
434    fn enqueue_graph_count_sync_task(&mut self) {
435        let memory_for_sync = self.services.memory.persistence.memory.clone();
436        let metrics_tx_sync = self.runtime.metrics.metrics_tx.clone();
437        let start_time_sync = self.runtime.lifecycle.start_time;
438        let cid_sync = self.services.memory.persistence.conversation_id;
439        let graph_store_sync = memory_for_sync.as_ref().and_then(|m| m.graph_store.clone());
440        let sqlite_sync = memory_for_sync.as_ref().map(|m| m.sqlite().clone());
441        let guidelines_enabled = self.services.memory.extraction.graph_config.enabled;
442
443        self.runtime.lifecycle.supervisor.spawn(
444            super::agent_supervisor::TaskClass::Telemetry,
445            "graph_count_sync",
446            async move {
447                let Some(store) = graph_store_sync else {
448                    return;
449                };
450                let Some(tx) = metrics_tx_sync else { return };
451
452                let (entities, edges, communities) = tokio::join!(
453                    store.entity_count(),
454                    store.active_edge_count(),
455                    store.community_count()
456                );
457                let elapsed = start_time_sync.elapsed().as_secs();
458                tx.send_modify(|m| {
459                    m.uptime_seconds = elapsed;
460                    m.graph_entities_total = entities.unwrap_or(0).cast_unsigned();
461                    m.graph_edges_total = edges.unwrap_or(0).cast_unsigned();
462                    m.graph_communities_total = communities.unwrap_or(0).cast_unsigned();
463                });
464
465                // Sync guidelines status.
466                if guidelines_enabled && let Some(sqlite) = sqlite_sync {
467                    match tokio::time::timeout(
468                        std::time::Duration::from_secs(10),
469                        sqlite.load_compression_guidelines_meta(cid_sync),
470                    )
471                    .await
472                    {
473                        Ok(Ok((version, created_at))) => {
474                            #[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
475                            let version_u32 = u32::try_from(version).unwrap_or(0);
476                            tx.send_modify(|m| {
477                                m.guidelines_version = version_u32;
478                                m.guidelines_updated_at = created_at;
479                            });
480                        }
481                        Ok(Err(e)) => {
482                            tracing::debug!("guidelines status sync failed: {e:#}");
483                        }
484                        Err(_) => {
485                            tracing::debug!("guidelines status sync timed out");
486                        }
487                    }
488                }
489            },
490        );
491    }
492
493    /// Enqueue persona extraction via supervisor (background, no `send_status`).
494    fn enqueue_persona_extraction_task(&mut self) {
495        use zeph_memory::semantic::{PersonaExtractionConfig, extract_persona_facts};
496
497        let cfg = &self.services.memory.extraction.persona_config;
498        if !cfg.enabled {
499            return;
500        }
501
502        let Some(memory) = &self.services.memory.persistence.memory else {
503            return;
504        };
505
506        let user_messages: Vec<String> = self
507            .msg
508            .messages
509            .iter()
510            .filter(|m| {
511                m.role == Role::User
512                    && !m
513                        .parts
514                        .iter()
515                        .any(|p| matches!(p, MessagePart::ToolResult { .. }))
516            })
517            .take(8)
518            .map(|m| {
519                if m.content.len() > 2048 {
520                    m.content[..m.content.floor_char_boundary(2048)].to_owned()
521                } else {
522                    m.content.clone()
523                }
524            })
525            .collect();
526
527        if user_messages.len() < cfg.min_messages {
528            return;
529        }
530
531        let timeout_secs = cfg.extraction_timeout_secs;
532        let extraction_cfg = PersonaExtractionConfig {
533            enabled: cfg.enabled,
534            min_messages: cfg.min_messages,
535            max_messages: cfg.max_messages,
536            extraction_timeout_secs: timeout_secs,
537        };
538
539        let provider = self.resolve_background_provider(cfg.persona_provider.as_str());
540        let store = memory.sqlite().clone();
541        let conversation_id = self
542            .services
543            .memory
544            .persistence
545            .conversation_id
546            .map(|c| c.0);
547
548        self.runtime.lifecycle.supervisor.spawn(
549            super::agent_supervisor::TaskClass::Enrichment,
550            "persona_extraction",
551            async move {
552                let user_message_refs: Vec<&str> =
553                    user_messages.iter().map(String::as_str).collect();
554                let fut = extract_persona_facts(
555                    &store,
556                    &provider,
557                    &user_message_refs,
558                    &extraction_cfg,
559                    conversation_id,
560                );
561                match tokio::time::timeout(std::time::Duration::from_secs(timeout_secs), fut).await
562                {
563                    Ok(Ok(n)) => tracing::debug!(upserted = n, "persona extraction complete"),
564                    Ok(Err(e)) => tracing::warn!(error = %e, "persona extraction failed"),
565                    Err(_) => tracing::warn!(
566                        timeout_secs,
567                        "persona extraction timed out — no facts written this turn"
568                    ),
569                }
570            },
571        );
572    }
573
574    /// Enqueue trajectory extraction via supervisor (background).
575    fn enqueue_trajectory_extraction_task(&mut self) {
576        use zeph_memory::semantic::{TrajectoryExtractionConfig, extract_trajectory_entries};
577
578        let cfg = self.services.memory.extraction.trajectory_config.clone();
579        if !cfg.enabled {
580            return;
581        }
582
583        let Some(memory) = &self.services.memory.persistence.memory else {
584            return;
585        };
586
587        let conversation_id = match self.services.memory.persistence.conversation_id {
588            Some(cid) => cid.0,
589            None => return,
590        };
591
592        let tail_start = self.msg.messages.len().saturating_sub(cfg.max_messages);
593        let turn_messages: Vec<zeph_llm::provider::Message> =
594            self.msg.messages[tail_start..].to_vec();
595
596        if turn_messages.is_empty() {
597            return;
598        }
599
600        let extraction_cfg = TrajectoryExtractionConfig {
601            enabled: cfg.enabled,
602            max_messages: cfg.max_messages,
603            extraction_timeout_secs: cfg.extraction_timeout_secs,
604        };
605
606        let provider = self.resolve_background_provider(cfg.trajectory_provider.as_str());
607        let store = memory.sqlite().clone();
608        let min_confidence = cfg.min_confidence;
609
610        self.runtime.lifecycle.supervisor.spawn(
611            super::agent_supervisor::TaskClass::Enrichment,
612            "trajectory_extraction",
613            async move {
614                let entries =
615                    match extract_trajectory_entries(&provider, &turn_messages, &extraction_cfg)
616                        .await
617                    {
618                        Ok(e) => e,
619                        Err(e) => {
620                            tracing::warn!(error = %e, "trajectory extraction failed");
621                            return;
622                        }
623                    };
624
625                let last_id = store
626                    .trajectory_last_extracted_message_id(conversation_id)
627                    .await
628                    .unwrap_or(0);
629
630                let mut max_id = last_id;
631                for entry in &entries {
632                    if entry.confidence < min_confidence {
633                        continue;
634                    }
635                    let tools_json = serde_json::to_string(&entry.tools_used)
636                        .unwrap_or_else(|_| "[]".to_string());
637                    match store
638                        .insert_trajectory_entry(zeph_memory::NewTrajectoryEntry {
639                            conversation_id: Some(conversation_id),
640                            turn_index: 0,
641                            kind: &entry.kind,
642                            intent: &entry.intent,
643                            outcome: &entry.outcome,
644                            tools_used: &tools_json,
645                            confidence: entry.confidence,
646                        })
647                        .await
648                    {
649                        Ok(id) => {
650                            if id > max_id {
651                                max_id = id;
652                            }
653                        }
654                        Err(e) => tracing::warn!(error = %e, "failed to insert trajectory entry"),
655                    }
656                }
657
658                if max_id > last_id {
659                    let _ = store
660                        .set_trajectory_last_extracted_message_id(conversation_id, max_id)
661                        .await;
662                }
663
664                tracing::debug!(
665                    count = entries.len(),
666                    conversation_id,
667                    "trajectory extraction complete"
668                );
669            },
670        );
671    }
672
673    /// Enqueue reasoning strategy distillation via supervisor (background, fire-and-forget).
674    ///
675    /// Mirrors [`Self::enqueue_trajectory_extraction_task`]. Runs after every assistant turn
676    /// when `memory.reasoning.enabled = true` and a `ReasoningMemory` is attached.
677    fn enqueue_reasoning_extraction_task(&mut self) {
678        let cfg = self.services.memory.extraction.reasoning_config.clone();
679        if !cfg.enabled {
680            return;
681        }
682
683        let Some(memory) = &self.services.memory.persistence.memory else {
684            return;
685        };
686
687        let Some(reasoning) = memory.reasoning.clone() else {
688            return;
689        };
690
691        let tail_start = self.msg.messages.len().saturating_sub(cfg.max_messages);
692        let turn_messages: Vec<zeph_llm::provider::Message> =
693            self.msg.messages[tail_start..].to_vec();
694
695        if turn_messages.len() < cfg.min_messages {
696            return;
697        }
698
699        let extract_provider = self.resolve_background_provider(cfg.extract_provider.as_str());
700        let distill_provider = self.resolve_background_provider(cfg.distill_provider.as_str());
701        let embed_provider = memory.effective_embed_provider().clone();
702        let store_limit = cfg.store_limit;
703        let extraction_timeout = std::time::Duration::from_secs(cfg.extraction_timeout_secs);
704        let distill_timeout = std::time::Duration::from_secs(cfg.distill_timeout_secs);
705        let self_judge_window = cfg.self_judge_window;
706        let min_assistant_chars = cfg.min_assistant_chars;
707
708        self.runtime.lifecycle.supervisor.spawn(
709            super::agent_supervisor::TaskClass::Enrichment,
710            "reasoning_extraction",
711            async move {
712                if let Err(e) = zeph_memory::process_reasoning_turn(
713                    &reasoning,
714                    &extract_provider,
715                    &distill_provider,
716                    &embed_provider,
717                    &turn_messages,
718                    zeph_memory::ProcessTurnConfig {
719                        store_limit,
720                        extraction_timeout,
721                        distill_timeout,
722                        self_judge_window,
723                        min_assistant_chars,
724                    },
725                )
726                .await
727                {
728                    tracing::warn!(error = %e, "reasoning: process_turn failed");
729                }
730
731                tracing::debug!("reasoning extraction complete");
732            },
733        );
734    }
735
736    /// D-MEM RPE check: returns `true` when the current turn should skip graph extraction.
737    ///
738    /// Embeds `content`, computes RPE via the router, and updates the router state.
739    /// Returns `false` (do not skip) on any error — conservative fallback.
740    async fn rpe_should_skip(&mut self, content: &str) -> bool {
741        let Some(ref rpe_mutex) = self.services.memory.extraction.rpe_router else {
742            return false;
743        };
744        let Some(memory) = &self.services.memory.persistence.memory else {
745            return false;
746        };
747        let candidates = zeph_memory::extract_candidate_entities(content);
748        let provider = memory.provider();
749        let Ok(Ok(emb_vec)) =
750            tokio::time::timeout(std::time::Duration::from_secs(5), provider.embed(content)).await
751        else {
752            return false; // embed failed/timed out → extract
753        };
754        if let Ok(mut router) = rpe_mutex.lock() {
755            let signal = router.compute(&emb_vec, &candidates);
756            router.push_embedding(emb_vec);
757            router.push_entities(&candidates);
758            !signal.should_extract
759        } else {
760            tracing::warn!("rpe_router mutex poisoned; falling through to extract");
761            false
762        }
763    }
764}
765
766#[cfg(test)]
767mod tests {
768    use super::super::agent_tests::{
769        MetricsSnapshot, MockChannel, MockToolExecutor, create_test_registry, mock_provider,
770    };
771    use super::*;
772    use zeph_llm::any::AnyProvider;
773    use zeph_llm::provider::Message;
774    use zeph_memory::semantic::SemanticMemory;
775
776    async fn test_memory(provider: &AnyProvider) -> SemanticMemory {
777        SemanticMemory::new(
778            ":memory:",
779            "http://127.0.0.1:1",
780            None,
781            provider.clone(),
782            "test-model",
783        )
784        .await
785        .unwrap()
786    }
787
788    #[tokio::test]
789    async fn load_history_without_memory_returns_ok() {
790        let provider = mock_provider(vec![]);
791        let channel = MockChannel::new(vec![]);
792        let registry = create_test_registry();
793        let executor = MockToolExecutor::no_tools();
794        let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
795
796        let result = agent.load_history().await;
797        assert!(result.is_ok());
798        // No messages added when no memory is configured
799        assert_eq!(agent.msg.messages.len(), 1); // system prompt only
800    }
801
802    #[tokio::test]
803    async fn load_history_with_messages_injects_into_agent() {
804        let provider = mock_provider(vec![]);
805        let channel = MockChannel::new(vec![]);
806        let registry = create_test_registry();
807        let executor = MockToolExecutor::no_tools();
808
809        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
810        let cid = memory.sqlite().create_conversation().await.unwrap();
811
812        memory
813            .sqlite()
814            .save_message(cid, "user", "hello from history")
815            .await
816            .unwrap();
817        memory
818            .sqlite()
819            .save_message(cid, "assistant", "hi back")
820            .await
821            .unwrap();
822
823        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
824            std::sync::Arc::new(memory),
825            cid,
826            50,
827            5,
828            100,
829        );
830
831        let messages_before = agent.msg.messages.len();
832        agent.load_history().await.unwrap();
833        // Two messages were added from history
834        assert_eq!(agent.msg.messages.len(), messages_before + 2);
835    }
836
837    #[tokio::test]
838    async fn load_history_skips_empty_messages() {
839        let provider = mock_provider(vec![]);
840        let channel = MockChannel::new(vec![]);
841        let registry = create_test_registry();
842        let executor = MockToolExecutor::no_tools();
843
844        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
845        let cid = memory.sqlite().create_conversation().await.unwrap();
846
847        // Save an empty message (should be skipped) and a valid one
848        memory
849            .sqlite()
850            .save_message(cid, "user", "   ")
851            .await
852            .unwrap();
853        memory
854            .sqlite()
855            .save_message(cid, "user", "real message")
856            .await
857            .unwrap();
858
859        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
860            std::sync::Arc::new(memory),
861            cid,
862            50,
863            5,
864            100,
865        );
866
867        let messages_before = agent.msg.messages.len();
868        agent.load_history().await.unwrap();
869        // Only the non-empty message is loaded
870        assert_eq!(agent.msg.messages.len(), messages_before + 1);
871    }
872
873    #[tokio::test]
874    async fn load_history_with_empty_store_returns_ok() {
875        let provider = mock_provider(vec![]);
876        let channel = MockChannel::new(vec![]);
877        let registry = create_test_registry();
878        let executor = MockToolExecutor::no_tools();
879
880        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
881        let cid = memory.sqlite().create_conversation().await.unwrap();
882
883        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
884            std::sync::Arc::new(memory),
885            cid,
886            50,
887            5,
888            100,
889        );
890
891        let messages_before = agent.msg.messages.len();
892        agent.load_history().await.unwrap();
893        // No messages added — empty history
894        assert_eq!(agent.msg.messages.len(), messages_before);
895    }
896
897    #[tokio::test]
898    async fn load_history_increments_session_count_for_existing_messages() {
899        let provider = mock_provider(vec![]);
900        let channel = MockChannel::new(vec![]);
901        let registry = create_test_registry();
902        let executor = MockToolExecutor::no_tools();
903
904        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
905        let cid = memory.sqlite().create_conversation().await.unwrap();
906
907        // Save two messages — they start with session_count = 0.
908        let id1 = memory
909            .sqlite()
910            .save_message(cid, "user", "hello")
911            .await
912            .unwrap();
913        let id2 = memory
914            .sqlite()
915            .save_message(cid, "assistant", "hi")
916            .await
917            .unwrap();
918
919        let memory_arc = std::sync::Arc::new(memory);
920        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
921            memory_arc.clone(),
922            cid,
923            50,
924            5,
925            100,
926        );
927
928        agent.load_history().await.unwrap();
929
930        // Both episodic messages must have session_count = 1 after restore.
931        let counts: Vec<i64> = zeph_db::query_scalar(
932            "SELECT session_count FROM messages WHERE id IN (?, ?) ORDER BY id",
933        )
934        .bind(id1)
935        .bind(id2)
936        .fetch_all(memory_arc.sqlite().pool())
937        .await
938        .unwrap();
939        assert_eq!(
940            counts,
941            vec![1, 1],
942            "session_count must be 1 after first restore"
943        );
944    }
945
946    #[tokio::test]
947    async fn load_history_does_not_increment_session_count_for_new_conversation() {
948        let provider = mock_provider(vec![]);
949        let channel = MockChannel::new(vec![]);
950        let registry = create_test_registry();
951        let executor = MockToolExecutor::no_tools();
952
953        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
954        let cid = memory.sqlite().create_conversation().await.unwrap();
955
956        // No messages saved — empty conversation.
957        let memory_arc = std::sync::Arc::new(memory);
958        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
959            memory_arc.clone(),
960            cid,
961            50,
962            5,
963            100,
964        );
965
966        agent.load_history().await.unwrap();
967
968        // No rows → no session_count increments → query returns empty.
969        let counts: Vec<i64> =
970            zeph_db::query_scalar("SELECT session_count FROM messages WHERE conversation_id = ?")
971                .bind(cid)
972                .fetch_all(memory_arc.sqlite().pool())
973                .await
974                .unwrap();
975        assert!(counts.is_empty(), "new conversation must have no messages");
976    }
977
978    #[tokio::test]
979    async fn persist_message_without_memory_silently_returns() {
980        // No memory configured — persist_message must not panic
981        let provider = mock_provider(vec![]);
982        let channel = MockChannel::new(vec![]);
983        let registry = create_test_registry();
984        let executor = MockToolExecutor::no_tools();
985        let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
986
987        // Must not panic and must complete
988        agent.persist_message(Role::User, "hello", &[], false).await;
989    }
990
991    #[tokio::test]
992    async fn persist_message_assistant_autosave_false_uses_save_only() {
993        let provider = mock_provider(vec![]);
994        let channel = MockChannel::new(vec![]);
995        let registry = create_test_registry();
996        let executor = MockToolExecutor::no_tools();
997
998        let (tx, rx) = tokio::sync::watch::channel(MetricsSnapshot::default());
999        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1000        let cid = memory.sqlite().create_conversation().await.unwrap();
1001
1002        let mut agent = Agent::new(provider, channel, registry, None, 5, executor)
1003            .with_metrics(tx)
1004            .with_memory(std::sync::Arc::new(memory), cid, 50, 5, 100);
1005        agent.services.memory.persistence.autosave_assistant = false;
1006        agent.services.memory.persistence.autosave_min_length = 20;
1007
1008        agent
1009            .persist_message(Role::Assistant, "short assistant reply", &[], false)
1010            .await;
1011
1012        let history = agent
1013            .services
1014            .memory
1015            .persistence
1016            .memory
1017            .as_ref()
1018            .unwrap()
1019            .sqlite()
1020            .load_history(cid, 50)
1021            .await
1022            .unwrap();
1023        assert_eq!(history.len(), 1, "message must be saved");
1024        assert_eq!(history[0].content, "short assistant reply");
1025        // embeddings_generated must remain 0 — save_only path does not embed
1026        assert_eq!(rx.borrow().embeddings_generated, 0);
1027    }
1028
1029    #[tokio::test]
1030    async fn persist_message_assistant_below_min_length_uses_save_only() {
1031        let provider = mock_provider(vec![]);
1032        let channel = MockChannel::new(vec![]);
1033        let registry = create_test_registry();
1034        let executor = MockToolExecutor::no_tools();
1035
1036        let (tx, rx) = tokio::sync::watch::channel(MetricsSnapshot::default());
1037        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1038        let cid = memory.sqlite().create_conversation().await.unwrap();
1039
1040        // autosave_assistant=true but min_length=1000 — short content falls back to save_only
1041        let mut agent = Agent::new(provider, channel, registry, None, 5, executor)
1042            .with_metrics(tx)
1043            .with_memory(std::sync::Arc::new(memory), cid, 50, 5, 100);
1044        agent.services.memory.persistence.autosave_assistant = true;
1045        agent.services.memory.persistence.autosave_min_length = 1000;
1046
1047        agent
1048            .persist_message(Role::Assistant, "too short", &[], false)
1049            .await;
1050
1051        let history = agent
1052            .services
1053            .memory
1054            .persistence
1055            .memory
1056            .as_ref()
1057            .unwrap()
1058            .sqlite()
1059            .load_history(cid, 50)
1060            .await
1061            .unwrap();
1062        assert_eq!(history.len(), 1, "message must be saved");
1063        assert_eq!(history[0].content, "too short");
1064        assert_eq!(rx.borrow().embeddings_generated, 0);
1065    }
1066
1067    #[tokio::test]
1068    async fn persist_message_assistant_at_min_length_boundary_uses_embed() {
1069        // content.len() == autosave_min_length → should_embed = true (>= boundary).
1070        let provider = mock_provider(vec![]);
1071        let channel = MockChannel::new(vec![]);
1072        let registry = create_test_registry();
1073        let executor = MockToolExecutor::no_tools();
1074
1075        let (tx, rx) = tokio::sync::watch::channel(MetricsSnapshot::default());
1076        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1077        let cid = memory.sqlite().create_conversation().await.unwrap();
1078
1079        let min_length = 10usize;
1080        let mut agent = Agent::new(provider, channel, registry, None, 5, executor)
1081            .with_metrics(tx)
1082            .with_memory(std::sync::Arc::new(memory), cid, 50, 5, 100);
1083        agent.services.memory.persistence.autosave_assistant = true;
1084        agent.services.memory.persistence.autosave_min_length = min_length;
1085
1086        // Exact boundary: len == min_length → embed path.
1087        let content_at_boundary = "A".repeat(min_length);
1088        assert_eq!(content_at_boundary.len(), min_length);
1089        agent
1090            .persist_message(Role::Assistant, &content_at_boundary, &[], false)
1091            .await;
1092
1093        // sqlite_message_count must be incremented regardless of embedding success.
1094        assert_eq!(rx.borrow().sqlite_message_count, 1);
1095    }
1096
1097    #[tokio::test]
1098    async fn persist_message_assistant_one_below_min_length_uses_save_only() {
1099        // content.len() == autosave_min_length - 1 → should_embed = false (below boundary).
1100        let provider = mock_provider(vec![]);
1101        let channel = MockChannel::new(vec![]);
1102        let registry = create_test_registry();
1103        let executor = MockToolExecutor::no_tools();
1104
1105        let (tx, rx) = tokio::sync::watch::channel(MetricsSnapshot::default());
1106        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1107        let cid = memory.sqlite().create_conversation().await.unwrap();
1108
1109        let min_length = 10usize;
1110        let mut agent = Agent::new(provider, channel, registry, None, 5, executor)
1111            .with_metrics(tx)
1112            .with_memory(std::sync::Arc::new(memory), cid, 50, 5, 100);
1113        agent.services.memory.persistence.autosave_assistant = true;
1114        agent.services.memory.persistence.autosave_min_length = min_length;
1115
1116        // One below boundary: len == min_length - 1 → save_only path, no embedding.
1117        let content_below_boundary = "A".repeat(min_length - 1);
1118        assert_eq!(content_below_boundary.len(), min_length - 1);
1119        agent
1120            .persist_message(Role::Assistant, &content_below_boundary, &[], false)
1121            .await;
1122
1123        let history = agent
1124            .services
1125            .memory
1126            .persistence
1127            .memory
1128            .as_ref()
1129            .unwrap()
1130            .sqlite()
1131            .load_history(cid, 50)
1132            .await
1133            .unwrap();
1134        assert_eq!(history.len(), 1, "message must still be saved");
1135        // save_only path does not embed.
1136        assert_eq!(rx.borrow().embeddings_generated, 0);
1137    }
1138
1139    #[tokio::test]
1140    async fn persist_message_increments_unsummarized_count() {
1141        let provider = mock_provider(vec![]);
1142        let channel = MockChannel::new(vec![]);
1143        let registry = create_test_registry();
1144        let executor = MockToolExecutor::no_tools();
1145
1146        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1147        let cid = memory.sqlite().create_conversation().await.unwrap();
1148
1149        // threshold=100 ensures no summarization is triggered
1150        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
1151            std::sync::Arc::new(memory),
1152            cid,
1153            50,
1154            5,
1155            100,
1156        );
1157
1158        assert_eq!(agent.services.memory.persistence.unsummarized_count, 0);
1159
1160        agent.persist_message(Role::User, "first", &[], false).await;
1161        assert_eq!(agent.services.memory.persistence.unsummarized_count, 1);
1162
1163        agent
1164            .persist_message(Role::User, "second", &[], false)
1165            .await;
1166        assert_eq!(agent.services.memory.persistence.unsummarized_count, 2);
1167    }
1168
1169    #[tokio::test]
1170    async fn check_summarization_resets_counter_on_success() {
1171        let provider = mock_provider(vec![]);
1172        let channel = MockChannel::new(vec![]);
1173        let registry = create_test_registry();
1174        let executor = MockToolExecutor::no_tools();
1175
1176        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1177        let cid = memory.sqlite().create_conversation().await.unwrap();
1178
1179        // threshold=1 so the second persist triggers summarization check (count > threshold)
1180        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
1181            std::sync::Arc::new(memory),
1182            cid,
1183            50,
1184            5,
1185            1,
1186        );
1187
1188        agent.persist_message(Role::User, "msg1", &[], false).await;
1189        agent.persist_message(Role::User, "msg2", &[], false).await;
1190
1191        // After summarization attempt (summarize returns Ok(None) since no messages qualify),
1192        // the counter is NOT reset to 0 — only reset on Ok(Some(_)).
1193        // This verifies check_summarization is called and the guard condition works.
1194        // unsummarized_count must be >= 2 before any summarization or 0 if summarization ran.
1195        assert!(agent.services.memory.persistence.unsummarized_count <= 2);
1196    }
1197
1198    #[tokio::test]
1199    async fn unsummarized_count_not_incremented_without_memory() {
1200        let provider = mock_provider(vec![]);
1201        let channel = MockChannel::new(vec![]);
1202        let registry = create_test_registry();
1203        let executor = MockToolExecutor::no_tools();
1204        let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
1205
1206        agent.persist_message(Role::User, "hello", &[], false).await;
1207        // No memory configured — persist_message returns early, counter must stay 0.
1208        assert_eq!(agent.services.memory.persistence.unsummarized_count, 0);
1209    }
1210
1211    // R-CRIT-01: unit tests for enqueue_graph_extraction_task guard conditions.
1212    mod graph_extraction_guards {
1213        use super::*;
1214        use crate::config::GraphConfig;
1215        use zeph_llm::provider::MessageMetadata;
1216        use zeph_memory::graph::GraphStore;
1217
1218        fn enabled_graph_config() -> GraphConfig {
1219            GraphConfig {
1220                enabled: true,
1221                ..GraphConfig::default()
1222            }
1223        }
1224
1225        async fn agent_with_graph(
1226            provider: &AnyProvider,
1227            config: GraphConfig,
1228        ) -> Agent<MockChannel> {
1229            let memory =
1230                test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1231            let cid = memory.sqlite().create_conversation().await.unwrap();
1232            Agent::new(
1233                provider.clone(),
1234                MockChannel::new(vec![]),
1235                create_test_registry(),
1236                None,
1237                5,
1238                MockToolExecutor::no_tools(),
1239            )
1240            .with_memory(std::sync::Arc::new(memory), cid, 50, 5, 100)
1241            .with_graph_config(config)
1242        }
1243
1244        #[tokio::test]
1245        async fn injection_flag_guard_skips_extraction() {
1246            // has_injection_flags=true → extraction must be skipped; no counter in graph_metadata.
1247            let provider = mock_provider(vec![]);
1248            let mut agent = agent_with_graph(&provider, enabled_graph_config()).await;
1249            let pool = agent
1250                .services
1251                .memory
1252                .persistence
1253                .memory
1254                .as_ref()
1255                .unwrap()
1256                .sqlite()
1257                .pool()
1258                .clone();
1259
1260            agent
1261                .enqueue_graph_extraction_task("I use Rust", true, false)
1262                .await;
1263
1264            // Give any accidental spawn time to settle.
1265            tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1266
1267            let store = GraphStore::new(pool);
1268            let count = store.get_metadata("extraction_count").await.unwrap();
1269            assert!(
1270                count.is_none(),
1271                "injection flag must prevent extraction_count from being written"
1272            );
1273        }
1274
1275        #[tokio::test]
1276        async fn disabled_config_guard_skips_extraction() {
1277            // graph.enabled=false → extraction must be skipped.
1278            let provider = mock_provider(vec![]);
1279            let disabled_cfg = GraphConfig {
1280                enabled: false,
1281                ..GraphConfig::default()
1282            };
1283            let mut agent = agent_with_graph(&provider, disabled_cfg).await;
1284            let pool = agent
1285                .services
1286                .memory
1287                .persistence
1288                .memory
1289                .as_ref()
1290                .unwrap()
1291                .sqlite()
1292                .pool()
1293                .clone();
1294
1295            agent
1296                .enqueue_graph_extraction_task("I use Rust", false, false)
1297                .await;
1298
1299            tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1300
1301            let store = GraphStore::new(pool);
1302            let count = store.get_metadata("extraction_count").await.unwrap();
1303            assert!(
1304                count.is_none(),
1305                "disabled graph config must prevent extraction"
1306            );
1307        }
1308
1309        #[tokio::test]
1310        async fn happy_path_fires_extraction() {
1311            // With enabled config and no injection flags, extraction is spawned.
1312            // MockProvider returns None (no entities), but the counter must be incremented.
1313            let provider = mock_provider(vec![]);
1314            let mut agent = agent_with_graph(&provider, enabled_graph_config()).await;
1315            let pool = agent
1316                .services
1317                .memory
1318                .persistence
1319                .memory
1320                .as_ref()
1321                .unwrap()
1322                .sqlite()
1323                .pool()
1324                .clone();
1325
1326            agent
1327                .enqueue_graph_extraction_task("I use Rust for systems programming", false, false)
1328                .await;
1329
1330            // Wait for the spawned task to complete.
1331            tokio::time::sleep(std::time::Duration::from_millis(200)).await;
1332
1333            let store = GraphStore::new(pool);
1334            let count = store.get_metadata("extraction_count").await.unwrap();
1335            assert!(
1336                count.is_some(),
1337                "happy-path extraction must increment extraction_count"
1338            );
1339        }
1340
1341        #[tokio::test]
1342        async fn tool_result_parts_guard_skips_extraction() {
1343            // FIX-1 regression: has_tool_result_parts=true → extraction must be skipped.
1344            // Tool result messages contain raw structured output (TOML, JSON, code) — not
1345            // conversational content. Extracting entities from them produces graph noise.
1346            let provider = mock_provider(vec![]);
1347            let mut agent = agent_with_graph(&provider, enabled_graph_config()).await;
1348            let pool = agent
1349                .services
1350                .memory
1351                .persistence
1352                .memory
1353                .as_ref()
1354                .unwrap()
1355                .sqlite()
1356                .pool()
1357                .clone();
1358
1359            agent
1360                .enqueue_graph_extraction_task(
1361                    "[tool_result: abc123]\nprovider_type = \"claude\"\nallowed_commands = []",
1362                    false,
1363                    true, // has_tool_result_parts
1364                )
1365                .await;
1366
1367            tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1368
1369            let store = GraphStore::new(pool);
1370            let count = store.get_metadata("extraction_count").await.unwrap();
1371            assert!(
1372                count.is_none(),
1373                "tool result message must not trigger graph extraction"
1374            );
1375        }
1376
1377        #[tokio::test]
1378        async fn context_filter_excludes_tool_result_messages() {
1379            // FIX-2: context_messages must not include tool result user messages.
1380            // When enqueue_graph_extraction_task collects context, it filters out
1381            // Role::User messages that contain ToolResult parts — only conversational
1382            // user messages are included as extraction context.
1383            //
1384            // This test verifies the guard fires: a tool result message alone is passed
1385            // (has_tool_result_parts=true) → extraction is skipped entirely, so context
1386            // filtering is not exercised. We verify FIX-2 by ensuring a prior tool result
1387            // message in agent.msg.messages is excluded when a subsequent conversational message
1388            // triggers extraction.
1389            let provider = mock_provider(vec![]);
1390            let mut agent = agent_with_graph(&provider, enabled_graph_config()).await;
1391
1392            // Add a tool result message to the agent's message history — this simulates
1393            // a tool call response that arrived before the current conversational turn.
1394            agent.msg.messages.push(Message {
1395                role: Role::User,
1396                content: "[tool_result: abc]\nprovider_type = \"openai\"".to_owned(),
1397                parts: vec![MessagePart::ToolResult {
1398                    tool_use_id: "abc".to_owned(),
1399                    content: "provider_type = \"openai\"".to_owned(),
1400                    is_error: false,
1401                }],
1402                metadata: MessageMetadata::default(),
1403            });
1404
1405            let pool = agent
1406                .services
1407                .memory
1408                .persistence
1409                .memory
1410                .as_ref()
1411                .unwrap()
1412                .sqlite()
1413                .pool()
1414                .clone();
1415
1416            // Trigger extraction for a conversational message (not a tool result).
1417            agent
1418                .enqueue_graph_extraction_task(
1419                    "I prefer Rust for systems programming",
1420                    false,
1421                    false,
1422                )
1423                .await;
1424
1425            tokio::time::sleep(std::time::Duration::from_millis(200)).await;
1426
1427            // Extraction must have fired (conversational message, no injection flags).
1428            let store = GraphStore::new(pool);
1429            let count = store.get_metadata("extraction_count").await.unwrap();
1430            assert!(
1431                count.is_some(),
1432                "conversational message must trigger extraction even with prior tool result in history"
1433            );
1434        }
1435    }
1436
1437    // R-PERS-01: unit tests for enqueue_persona_extraction_task guard conditions.
1438    mod persona_extraction_guards {
1439        use super::*;
1440        use zeph_config::PersonaConfig;
1441        use zeph_llm::provider::MessageMetadata;
1442
1443        fn enabled_persona_config() -> PersonaConfig {
1444            PersonaConfig {
1445                enabled: true,
1446                min_messages: 1,
1447                ..PersonaConfig::default()
1448            }
1449        }
1450
1451        async fn agent_with_persona(
1452            provider: &AnyProvider,
1453            config: PersonaConfig,
1454        ) -> Agent<MockChannel> {
1455            let memory =
1456                test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1457            let cid = memory.sqlite().create_conversation().await.unwrap();
1458            let mut agent = Agent::new(
1459                provider.clone(),
1460                MockChannel::new(vec![]),
1461                create_test_registry(),
1462                None,
1463                5,
1464                MockToolExecutor::no_tools(),
1465            )
1466            .with_memory(std::sync::Arc::new(memory), cid, 50, 5, 100);
1467            agent.services.memory.extraction.persona_config = config;
1468            agent
1469        }
1470
1471        #[tokio::test]
1472        async fn disabled_config_skips_spawn() {
1473            // persona.enabled=false → nothing is spawned; persona_memory stays empty.
1474            let provider = mock_provider(vec![]);
1475            let mut agent = agent_with_persona(
1476                &provider,
1477                PersonaConfig {
1478                    enabled: false,
1479                    ..PersonaConfig::default()
1480                },
1481            )
1482            .await;
1483
1484            // Inject a user message so message count is above threshold.
1485            agent.msg.messages.push(zeph_llm::provider::Message {
1486                role: Role::User,
1487                content: "I prefer Rust for systems programming".to_owned(),
1488                parts: vec![],
1489                metadata: MessageMetadata::default(),
1490            });
1491
1492            agent.enqueue_persona_extraction_task();
1493
1494            let store = agent
1495                .services
1496                .memory
1497                .persistence
1498                .memory
1499                .as_ref()
1500                .unwrap()
1501                .sqlite()
1502                .clone();
1503            let count = store.count_persona_facts().await.unwrap();
1504            assert_eq!(count, 0, "disabled persona config must not write any facts");
1505        }
1506
1507        #[tokio::test]
1508        async fn below_min_messages_skips_spawn() {
1509            // min_messages=3 but only 2 user messages → should skip.
1510            let provider = mock_provider(vec![]);
1511            let mut agent = agent_with_persona(
1512                &provider,
1513                PersonaConfig {
1514                    enabled: true,
1515                    min_messages: 3,
1516                    ..PersonaConfig::default()
1517                },
1518            )
1519            .await;
1520
1521            for text in ["I use Rust", "I prefer async code"] {
1522                agent.msg.messages.push(zeph_llm::provider::Message {
1523                    role: Role::User,
1524                    content: text.to_owned(),
1525                    parts: vec![],
1526                    metadata: MessageMetadata::default(),
1527                });
1528            }
1529
1530            agent.enqueue_persona_extraction_task();
1531
1532            let store = agent
1533                .services
1534                .memory
1535                .persistence
1536                .memory
1537                .as_ref()
1538                .unwrap()
1539                .sqlite()
1540                .clone();
1541            let count = store.count_persona_facts().await.unwrap();
1542            assert_eq!(
1543                count, 0,
1544                "below min_messages threshold must not trigger extraction"
1545            );
1546        }
1547
1548        #[tokio::test]
1549        async fn no_memory_skips_spawn() {
1550            // memory=None → must exit early without panic.
1551            let provider = mock_provider(vec![]);
1552            let channel = MockChannel::new(vec![]);
1553            let registry = create_test_registry();
1554            let executor = MockToolExecutor::no_tools();
1555            let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
1556            agent.services.memory.extraction.persona_config = enabled_persona_config();
1557            agent.msg.messages.push(zeph_llm::provider::Message {
1558                role: Role::User,
1559                content: "I like Rust".to_owned(),
1560                parts: vec![],
1561                metadata: MessageMetadata::default(),
1562            });
1563
1564            // Must not panic even without memory.
1565            agent.enqueue_persona_extraction_task();
1566        }
1567
1568        #[tokio::test]
1569        async fn enabled_enough_messages_spawns_extraction() {
1570            // enabled=true, min_messages=1, self-referential message → extraction runs eagerly
1571            // (not fire-and-forget) and chat() is called on the provider, verified via MockProvider.
1572            use zeph_llm::mock::MockProvider;
1573            let (mock, recorded) = MockProvider::default().with_recording();
1574            let provider = AnyProvider::Mock(mock);
1575            let mut agent = agent_with_persona(&provider, enabled_persona_config()).await;
1576
1577            agent.msg.messages.push(zeph_llm::provider::Message {
1578                role: Role::User,
1579                content: "I prefer Rust for systems programming".to_owned(),
1580                parts: vec![],
1581                metadata: MessageMetadata::default(),
1582            });
1583
1584            agent.enqueue_persona_extraction_task();
1585
1586            // Persona extraction runs via BackgroundSupervisor. Wait for tasks to complete.
1587            agent.runtime.lifecycle.supervisor.join_all_for_test().await;
1588
1589            let calls = recorded.lock().unwrap();
1590            assert!(
1591                !calls.is_empty(),
1592                "happy-path: provider.chat() must be called when extraction completes"
1593            );
1594        }
1595
1596        #[tokio::test]
1597        async fn messages_capped_at_eight() {
1598            // More than 8 user messages → only 8 are passed to extraction.
1599            // Each message contains "I" so self-referential gate passes.
1600            use zeph_llm::mock::MockProvider;
1601            let (mock, recorded) = MockProvider::default().with_recording();
1602            let provider = AnyProvider::Mock(mock);
1603            let mut agent = agent_with_persona(&provider, enabled_persona_config()).await;
1604
1605            for i in 0..12u32 {
1606                agent.msg.messages.push(zeph_llm::provider::Message {
1607                    role: Role::User,
1608                    content: format!("I like message {i}"),
1609                    parts: vec![],
1610                    metadata: MessageMetadata::default(),
1611                });
1612            }
1613
1614            agent.enqueue_persona_extraction_task();
1615
1616            // Allow the background task to complete before asserting.
1617            agent.runtime.lifecycle.supervisor.join_all_for_test().await;
1618
1619            // Verify extraction ran (provider was called).
1620            let calls = recorded.lock().unwrap();
1621            assert!(
1622                !calls.is_empty(),
1623                "extraction must run when enough messages present"
1624            );
1625            // Verify the prompt sent to the provider does not contain messages beyond the 8th.
1626            let prompt = &calls[0];
1627            let user_text = prompt
1628                .iter()
1629                .filter(|m| m.role == Role::User)
1630                .map(|m| m.content.as_str())
1631                .collect::<Vec<_>>()
1632                .join(" ");
1633            // Messages 8..11 ("I like message 8".."I like message 11") must not appear.
1634            assert!(
1635                !user_text.contains("I like message 8"),
1636                "message index 8 must be excluded from extraction input"
1637            );
1638        }
1639
1640        #[test]
1641        fn long_message_truncated_at_char_boundary() {
1642            // Directly verify the per-message truncation logic applied in
1643            // enqueue_persona_extraction_task: a content > 2048 bytes must be capped
1644            // to exactly floor_char_boundary(2048).
1645            let long_content = "x".repeat(3000);
1646            let truncated = if long_content.len() > 2048 {
1647                long_content[..long_content.floor_char_boundary(2048)].to_owned()
1648            } else {
1649                long_content.clone()
1650            };
1651            assert_eq!(
1652                truncated.len(),
1653                2048,
1654                "ASCII content must be truncated to exactly 2048 bytes"
1655            );
1656
1657            // Multi-byte boundary: build a string whose char boundary falls before 2048.
1658            let multi = "é".repeat(1500); // each 'é' is 2 bytes → 3000 bytes total
1659            let truncated_multi = if multi.len() > 2048 {
1660                multi[..multi.floor_char_boundary(2048)].to_owned()
1661            } else {
1662                multi.clone()
1663            };
1664            assert!(
1665                truncated_multi.len() <= 2048,
1666                "multi-byte content must not exceed 2048 bytes"
1667            );
1668            assert!(truncated_multi.is_char_boundary(truncated_multi.len()));
1669        }
1670    }
1671
1672    #[tokio::test]
1673    async fn persist_message_user_always_embeds_regardless_of_autosave_flag() {
1674        let provider = mock_provider(vec![]);
1675        let channel = MockChannel::new(vec![]);
1676        let registry = create_test_registry();
1677        let executor = MockToolExecutor::no_tools();
1678
1679        let (tx, rx) = tokio::sync::watch::channel(MetricsSnapshot::default());
1680        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1681        let cid = memory.sqlite().create_conversation().await.unwrap();
1682
1683        // autosave_assistant=false — but User role always takes embedding path
1684        let mut agent = Agent::new(provider, channel, registry, None, 5, executor)
1685            .with_metrics(tx)
1686            .with_memory(std::sync::Arc::new(memory), cid, 50, 5, 100);
1687        agent.services.memory.persistence.autosave_assistant = false;
1688        agent.services.memory.persistence.autosave_min_length = 20;
1689
1690        let long_user_msg = "A".repeat(100);
1691        agent
1692            .persist_message(Role::User, &long_user_msg, &[], false)
1693            .await;
1694
1695        let history = agent
1696            .services
1697            .memory
1698            .persistence
1699            .memory
1700            .as_ref()
1701            .unwrap()
1702            .sqlite()
1703            .load_history(cid, 50)
1704            .await
1705            .unwrap();
1706        assert_eq!(history.len(), 1, "user message must be saved");
1707        // User messages go through remember_with_parts (embedding path).
1708        // sqlite_message_count must increment regardless of Qdrant availability.
1709        assert_eq!(rx.borrow().sqlite_message_count, 1);
1710    }
1711
1712    // Round-trip tests: verify that persist_message saves the correct parts and they
1713    // are restored correctly by load_history.
1714
1715    #[tokio::test]
1716    async fn persist_message_saves_correct_tool_use_parts() {
1717        use zeph_llm::provider::MessagePart;
1718
1719        let provider = mock_provider(vec![]);
1720        let channel = MockChannel::new(vec![]);
1721        let registry = create_test_registry();
1722        let executor = MockToolExecutor::no_tools();
1723
1724        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1725        let cid = memory.sqlite().create_conversation().await.unwrap();
1726
1727        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
1728            std::sync::Arc::new(memory),
1729            cid,
1730            50,
1731            5,
1732            100,
1733        );
1734
1735        let parts = vec![MessagePart::ToolUse {
1736            id: "call_abc123".to_string(),
1737            name: "read_file".to_string(),
1738            input: serde_json::json!({"path": "/tmp/test.txt"}),
1739        }];
1740        let content = "[tool_use: read_file(call_abc123)]";
1741
1742        agent
1743            .persist_message(Role::Assistant, content, &parts, false)
1744            .await;
1745
1746        let history = agent
1747            .services
1748            .memory
1749            .persistence
1750            .memory
1751            .as_ref()
1752            .unwrap()
1753            .sqlite()
1754            .load_history(cid, 50)
1755            .await
1756            .unwrap();
1757
1758        assert_eq!(history.len(), 1);
1759        assert_eq!(history[0].role, Role::Assistant);
1760        assert_eq!(history[0].content, content);
1761        assert_eq!(history[0].parts.len(), 1);
1762        match &history[0].parts[0] {
1763            MessagePart::ToolUse { id, name, .. } => {
1764                assert_eq!(id, "call_abc123");
1765                assert_eq!(name, "read_file");
1766            }
1767            other => panic!("expected ToolUse part, got {other:?}"),
1768        }
1769        // Regression guard: assistant message must NOT have ToolResult parts
1770        assert!(
1771            !history[0]
1772                .parts
1773                .iter()
1774                .any(|p| matches!(p, MessagePart::ToolResult { .. })),
1775            "assistant message must not contain ToolResult parts"
1776        );
1777    }
1778
1779    #[tokio::test]
1780    async fn persist_message_saves_correct_tool_result_parts() {
1781        use zeph_llm::provider::MessagePart;
1782
1783        let provider = mock_provider(vec![]);
1784        let channel = MockChannel::new(vec![]);
1785        let registry = create_test_registry();
1786        let executor = MockToolExecutor::no_tools();
1787
1788        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1789        let cid = memory.sqlite().create_conversation().await.unwrap();
1790
1791        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
1792            std::sync::Arc::new(memory),
1793            cid,
1794            50,
1795            5,
1796            100,
1797        );
1798
1799        let parts = vec![MessagePart::ToolResult {
1800            tool_use_id: "call_abc123".to_string(),
1801            content: "file contents here".to_string(),
1802            is_error: false,
1803        }];
1804        let content = "[tool_result: call_abc123]\nfile contents here";
1805
1806        agent
1807            .persist_message(Role::User, content, &parts, false)
1808            .await;
1809
1810        let history = agent
1811            .services
1812            .memory
1813            .persistence
1814            .memory
1815            .as_ref()
1816            .unwrap()
1817            .sqlite()
1818            .load_history(cid, 50)
1819            .await
1820            .unwrap();
1821
1822        assert_eq!(history.len(), 1);
1823        assert_eq!(history[0].role, Role::User);
1824        assert_eq!(history[0].content, content);
1825        assert_eq!(history[0].parts.len(), 1);
1826        match &history[0].parts[0] {
1827            MessagePart::ToolResult {
1828                tool_use_id,
1829                content: result_content,
1830                is_error,
1831            } => {
1832                assert_eq!(tool_use_id, "call_abc123");
1833                assert_eq!(result_content, "file contents here");
1834                assert!(!is_error);
1835            }
1836            other => panic!("expected ToolResult part, got {other:?}"),
1837        }
1838        // Regression guard: user message with ToolResult must NOT have ToolUse parts
1839        assert!(
1840            !history[0]
1841                .parts
1842                .iter()
1843                .any(|p| matches!(p, MessagePart::ToolUse { .. })),
1844            "user ToolResult message must not contain ToolUse parts"
1845        );
1846    }
1847
1848    #[tokio::test]
1849    async fn persist_message_roundtrip_preserves_role_part_alignment() {
1850        use zeph_llm::provider::MessagePart;
1851
1852        let provider = mock_provider(vec![]);
1853        let channel = MockChannel::new(vec![]);
1854        let registry = create_test_registry();
1855        let executor = MockToolExecutor::no_tools();
1856
1857        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1858        let cid = memory.sqlite().create_conversation().await.unwrap();
1859
1860        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
1861            std::sync::Arc::new(memory),
1862            cid,
1863            50,
1864            5,
1865            100,
1866        );
1867
1868        // Persist assistant message with ToolUse parts
1869        let assistant_parts = vec![MessagePart::ToolUse {
1870            id: "id_1".to_string(),
1871            name: "list_dir".to_string(),
1872            input: serde_json::json!({"path": "/tmp"}),
1873        }];
1874        agent
1875            .persist_message(
1876                Role::Assistant,
1877                "[tool_use: list_dir(id_1)]",
1878                &assistant_parts,
1879                false,
1880            )
1881            .await;
1882
1883        // Persist user message with ToolResult parts
1884        let user_parts = vec![MessagePart::ToolResult {
1885            tool_use_id: "id_1".to_string(),
1886            content: "file1.txt\nfile2.txt".to_string(),
1887            is_error: false,
1888        }];
1889        agent
1890            .persist_message(
1891                Role::User,
1892                "[tool_result: id_1]\nfile1.txt\nfile2.txt",
1893                &user_parts,
1894                false,
1895            )
1896            .await;
1897
1898        let history = agent
1899            .services
1900            .memory
1901            .persistence
1902            .memory
1903            .as_ref()
1904            .unwrap()
1905            .sqlite()
1906            .load_history(cid, 50)
1907            .await
1908            .unwrap();
1909
1910        assert_eq!(history.len(), 2);
1911
1912        // First message: assistant + ToolUse
1913        assert_eq!(history[0].role, Role::Assistant);
1914        assert_eq!(history[0].content, "[tool_use: list_dir(id_1)]");
1915        assert!(
1916            matches!(&history[0].parts[0], MessagePart::ToolUse { id, .. } if id == "id_1"),
1917            "first message must be assistant ToolUse"
1918        );
1919
1920        // Second message: user + ToolResult
1921        assert_eq!(history[1].role, Role::User);
1922        assert_eq!(
1923            history[1].content,
1924            "[tool_result: id_1]\nfile1.txt\nfile2.txt"
1925        );
1926        assert!(
1927            matches!(&history[1].parts[0], MessagePart::ToolResult { tool_use_id, .. } if tool_use_id == "id_1"),
1928            "second message must be user ToolResult"
1929        );
1930
1931        // Cross-role regression guard: no swapped parts
1932        assert!(
1933            !history[0]
1934                .parts
1935                .iter()
1936                .any(|p| matches!(p, MessagePart::ToolResult { .. })),
1937            "assistant message must not have ToolResult parts"
1938        );
1939        assert!(
1940            !history[1]
1941                .parts
1942                .iter()
1943                .any(|p| matches!(p, MessagePart::ToolUse { .. })),
1944            "user message must not have ToolUse parts"
1945        );
1946    }
1947
1948    #[tokio::test]
1949    async fn persist_message_saves_correct_tool_output_parts() {
1950        use zeph_llm::provider::MessagePart;
1951
1952        let provider = mock_provider(vec![]);
1953        let channel = MockChannel::new(vec![]);
1954        let registry = create_test_registry();
1955        let executor = MockToolExecutor::no_tools();
1956
1957        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1958        let cid = memory.sqlite().create_conversation().await.unwrap();
1959
1960        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
1961            std::sync::Arc::new(memory),
1962            cid,
1963            50,
1964            5,
1965            100,
1966        );
1967
1968        let parts = vec![MessagePart::ToolOutput {
1969            tool_name: "shell".into(),
1970            body: "hello from shell".to_string(),
1971            compacted_at: None,
1972        }];
1973        let content = "[tool: shell]\nhello from shell";
1974
1975        agent
1976            .persist_message(Role::User, content, &parts, false)
1977            .await;
1978
1979        let history = agent
1980            .services
1981            .memory
1982            .persistence
1983            .memory
1984            .as_ref()
1985            .unwrap()
1986            .sqlite()
1987            .load_history(cid, 50)
1988            .await
1989            .unwrap();
1990
1991        assert_eq!(history.len(), 1);
1992        assert_eq!(history[0].role, Role::User);
1993        assert_eq!(history[0].content, content);
1994        assert_eq!(history[0].parts.len(), 1);
1995        match &history[0].parts[0] {
1996            MessagePart::ToolOutput {
1997                tool_name,
1998                body,
1999                compacted_at,
2000            } => {
2001                assert_eq!(tool_name, "shell");
2002                assert_eq!(body, "hello from shell");
2003                assert!(compacted_at.is_none());
2004            }
2005            other => panic!("expected ToolOutput part, got {other:?}"),
2006        }
2007    }
2008
2009    // --- sanitize_tool_pairs unit tests ---
2010
2011    #[tokio::test]
2012    async fn load_history_removes_trailing_orphan_tool_use() {
2013        use zeph_llm::provider::MessagePart;
2014
2015        let provider = mock_provider(vec![]);
2016        let channel = MockChannel::new(vec![]);
2017        let registry = create_test_registry();
2018        let executor = MockToolExecutor::no_tools();
2019
2020        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2021        let cid = memory.sqlite().create_conversation().await.unwrap();
2022        let sqlite = memory.sqlite();
2023
2024        // user message (normal)
2025        sqlite
2026            .save_message(cid, "user", "do something with a tool")
2027            .await
2028            .unwrap();
2029
2030        // assistant message with ToolUse parts — no following tool_result (orphan)
2031        let parts = serde_json::to_string(&[MessagePart::ToolUse {
2032            id: "call_orphan".to_string(),
2033            name: "shell".to_string(),
2034            input: serde_json::json!({"command": "ls"}),
2035        }])
2036        .unwrap();
2037        sqlite
2038            .save_message_with_parts(cid, "assistant", "[tool_use: shell(call_orphan)]", &parts)
2039            .await
2040            .unwrap();
2041
2042        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2043            std::sync::Arc::new(memory),
2044            cid,
2045            50,
2046            5,
2047            100,
2048        );
2049
2050        let messages_before = agent.msg.messages.len();
2051        agent.load_history().await.unwrap();
2052
2053        // Only the user message should be loaded; orphaned assistant tool_use removed.
2054        assert_eq!(
2055            agent.msg.messages.len(),
2056            messages_before + 1,
2057            "orphaned trailing tool_use must be removed"
2058        );
2059        assert_eq!(agent.msg.messages.last().unwrap().role, Role::User);
2060    }
2061
2062    #[tokio::test]
2063    async fn load_history_removes_leading_orphan_tool_result() {
2064        use zeph_llm::provider::MessagePart;
2065
2066        let provider = mock_provider(vec![]);
2067        let channel = MockChannel::new(vec![]);
2068        let registry = create_test_registry();
2069        let executor = MockToolExecutor::no_tools();
2070
2071        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2072        let cid = memory.sqlite().create_conversation().await.unwrap();
2073        let sqlite = memory.sqlite();
2074
2075        // Leading orphan: user message with ToolResult but no preceding tool_use
2076        let result_parts = serde_json::to_string(&[MessagePart::ToolResult {
2077            tool_use_id: "call_missing".to_string(),
2078            content: "result data".to_string(),
2079            is_error: false,
2080        }])
2081        .unwrap();
2082        sqlite
2083            .save_message_with_parts(
2084                cid,
2085                "user",
2086                "[tool_result: call_missing]\nresult data",
2087                &result_parts,
2088            )
2089            .await
2090            .unwrap();
2091
2092        // A valid assistant reply after the orphan
2093        sqlite
2094            .save_message(cid, "assistant", "here is my response")
2095            .await
2096            .unwrap();
2097
2098        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2099            std::sync::Arc::new(memory),
2100            cid,
2101            50,
2102            5,
2103            100,
2104        );
2105
2106        let messages_before = agent.msg.messages.len();
2107        agent.load_history().await.unwrap();
2108
2109        // Orphaned leading tool_result removed; only assistant message kept.
2110        assert_eq!(
2111            agent.msg.messages.len(),
2112            messages_before + 1,
2113            "orphaned leading tool_result must be removed"
2114        );
2115        assert_eq!(agent.msg.messages.last().unwrap().role, Role::Assistant);
2116    }
2117
2118    #[tokio::test]
2119    async fn load_history_preserves_complete_tool_pairs() {
2120        use zeph_llm::provider::MessagePart;
2121
2122        let provider = mock_provider(vec![]);
2123        let channel = MockChannel::new(vec![]);
2124        let registry = create_test_registry();
2125        let executor = MockToolExecutor::no_tools();
2126
2127        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2128        let cid = memory.sqlite().create_conversation().await.unwrap();
2129        let sqlite = memory.sqlite();
2130
2131        // Complete tool_use / tool_result pair
2132        let use_parts = serde_json::to_string(&[MessagePart::ToolUse {
2133            id: "call_ok".to_string(),
2134            name: "shell".to_string(),
2135            input: serde_json::json!({"command": "pwd"}),
2136        }])
2137        .unwrap();
2138        sqlite
2139            .save_message_with_parts(cid, "assistant", "[tool_use: shell(call_ok)]", &use_parts)
2140            .await
2141            .unwrap();
2142
2143        let result_parts = serde_json::to_string(&[MessagePart::ToolResult {
2144            tool_use_id: "call_ok".to_string(),
2145            content: "/home/user".to_string(),
2146            is_error: false,
2147        }])
2148        .unwrap();
2149        sqlite
2150            .save_message_with_parts(
2151                cid,
2152                "user",
2153                "[tool_result: call_ok]\n/home/user",
2154                &result_parts,
2155            )
2156            .await
2157            .unwrap();
2158
2159        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2160            std::sync::Arc::new(memory),
2161            cid,
2162            50,
2163            5,
2164            100,
2165        );
2166
2167        let messages_before = agent.msg.messages.len();
2168        agent.load_history().await.unwrap();
2169
2170        // Both messages must be preserved.
2171        assert_eq!(
2172            agent.msg.messages.len(),
2173            messages_before + 2,
2174            "complete tool_use/tool_result pair must be preserved"
2175        );
2176        assert_eq!(agent.msg.messages[messages_before].role, Role::Assistant);
2177        assert_eq!(agent.msg.messages[messages_before + 1].role, Role::User);
2178    }
2179
2180    #[tokio::test]
2181    async fn load_history_handles_multiple_trailing_orphans() {
2182        use zeph_llm::provider::MessagePart;
2183
2184        let provider = mock_provider(vec![]);
2185        let channel = MockChannel::new(vec![]);
2186        let registry = create_test_registry();
2187        let executor = MockToolExecutor::no_tools();
2188
2189        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2190        let cid = memory.sqlite().create_conversation().await.unwrap();
2191        let sqlite = memory.sqlite();
2192
2193        // Normal user message
2194        sqlite.save_message(cid, "user", "start").await.unwrap();
2195
2196        // First orphaned tool_use
2197        let parts1 = serde_json::to_string(&[MessagePart::ToolUse {
2198            id: "call_1".to_string(),
2199            name: "shell".to_string(),
2200            input: serde_json::json!({}),
2201        }])
2202        .unwrap();
2203        sqlite
2204            .save_message_with_parts(cid, "assistant", "[tool_use: shell(call_1)]", &parts1)
2205            .await
2206            .unwrap();
2207
2208        // Second orphaned tool_use (consecutive, no tool_result between them)
2209        let parts2 = serde_json::to_string(&[MessagePart::ToolUse {
2210            id: "call_2".to_string(),
2211            name: "read_file".to_string(),
2212            input: serde_json::json!({}),
2213        }])
2214        .unwrap();
2215        sqlite
2216            .save_message_with_parts(cid, "assistant", "[tool_use: read_file(call_2)]", &parts2)
2217            .await
2218            .unwrap();
2219
2220        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2221            std::sync::Arc::new(memory),
2222            cid,
2223            50,
2224            5,
2225            100,
2226        );
2227
2228        let messages_before = agent.msg.messages.len();
2229        agent.load_history().await.unwrap();
2230
2231        // Both orphaned tool_use messages removed; only the user message kept.
2232        assert_eq!(
2233            agent.msg.messages.len(),
2234            messages_before + 1,
2235            "all trailing orphaned tool_use messages must be removed"
2236        );
2237        assert_eq!(agent.msg.messages.last().unwrap().role, Role::User);
2238    }
2239
2240    #[tokio::test]
2241    async fn load_history_no_tool_messages_unchanged() {
2242        let provider = mock_provider(vec![]);
2243        let channel = MockChannel::new(vec![]);
2244        let registry = create_test_registry();
2245        let executor = MockToolExecutor::no_tools();
2246
2247        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2248        let cid = memory.sqlite().create_conversation().await.unwrap();
2249        let sqlite = memory.sqlite();
2250
2251        sqlite.save_message(cid, "user", "hello").await.unwrap();
2252        sqlite
2253            .save_message(cid, "assistant", "hi there")
2254            .await
2255            .unwrap();
2256        sqlite
2257            .save_message(cid, "user", "how are you?")
2258            .await
2259            .unwrap();
2260
2261        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2262            std::sync::Arc::new(memory),
2263            cid,
2264            50,
2265            5,
2266            100,
2267        );
2268
2269        let messages_before = agent.msg.messages.len();
2270        agent.load_history().await.unwrap();
2271
2272        // All three plain messages must be preserved.
2273        assert_eq!(
2274            agent.msg.messages.len(),
2275            messages_before + 3,
2276            "plain messages without tool parts must pass through unchanged"
2277        );
2278    }
2279
2280    #[tokio::test]
2281    async fn load_history_removes_both_leading_and_trailing_orphans() {
2282        use zeph_llm::provider::MessagePart;
2283
2284        let provider = mock_provider(vec![]);
2285        let channel = MockChannel::new(vec![]);
2286        let registry = create_test_registry();
2287        let executor = MockToolExecutor::no_tools();
2288
2289        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2290        let cid = memory.sqlite().create_conversation().await.unwrap();
2291        let sqlite = memory.sqlite();
2292
2293        // Leading orphan: user message with ToolResult, no preceding tool_use
2294        let result_parts = serde_json::to_string(&[MessagePart::ToolResult {
2295            tool_use_id: "call_leading".to_string(),
2296            content: "orphaned result".to_string(),
2297            is_error: false,
2298        }])
2299        .unwrap();
2300        sqlite
2301            .save_message_with_parts(
2302                cid,
2303                "user",
2304                "[tool_result: call_leading]\norphaned result",
2305                &result_parts,
2306            )
2307            .await
2308            .unwrap();
2309
2310        // Valid middle messages
2311        sqlite
2312            .save_message(cid, "user", "what is 2+2?")
2313            .await
2314            .unwrap();
2315        sqlite.save_message(cid, "assistant", "4").await.unwrap();
2316
2317        // Trailing orphan: assistant message with ToolUse, no following tool_result
2318        let use_parts = serde_json::to_string(&[MessagePart::ToolUse {
2319            id: "call_trailing".to_string(),
2320            name: "shell".to_string(),
2321            input: serde_json::json!({"command": "date"}),
2322        }])
2323        .unwrap();
2324        sqlite
2325            .save_message_with_parts(
2326                cid,
2327                "assistant",
2328                "[tool_use: shell(call_trailing)]",
2329                &use_parts,
2330            )
2331            .await
2332            .unwrap();
2333
2334        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2335            std::sync::Arc::new(memory),
2336            cid,
2337            50,
2338            5,
2339            100,
2340        );
2341
2342        let messages_before = agent.msg.messages.len();
2343        agent.load_history().await.unwrap();
2344
2345        // Both orphans removed; only the 2 valid middle messages kept.
2346        assert_eq!(
2347            agent.msg.messages.len(),
2348            messages_before + 2,
2349            "both leading and trailing orphans must be removed"
2350        );
2351        assert_eq!(agent.msg.messages[messages_before].role, Role::User);
2352        assert_eq!(agent.msg.messages[messages_before].content, "what is 2+2?");
2353        assert_eq!(
2354            agent.msg.messages[messages_before + 1].role,
2355            Role::Assistant
2356        );
2357        assert_eq!(agent.msg.messages[messages_before + 1].content, "4");
2358    }
2359
2360    /// RC1 regression: mid-history assistant[`ToolUse`] without a following user[`ToolResult`]
2361    /// must have its `ToolUse` parts stripped (text preserved). The message count stays the same
2362    /// because the assistant message has a text content fallback; only `ToolUse` parts are
2363    /// removed.
2364    #[tokio::test]
2365    async fn sanitize_tool_pairs_strips_mid_history_orphan_tool_use() {
2366        use zeph_llm::provider::MessagePart;
2367
2368        let provider = mock_provider(vec![]);
2369        let channel = MockChannel::new(vec![]);
2370        let registry = create_test_registry();
2371        let executor = MockToolExecutor::no_tools();
2372
2373        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2374        let cid = memory.sqlite().create_conversation().await.unwrap();
2375        let sqlite = memory.sqlite();
2376
2377        // Normal first exchange.
2378        sqlite
2379            .save_message(cid, "user", "first question")
2380            .await
2381            .unwrap();
2382        sqlite
2383            .save_message(cid, "assistant", "first answer")
2384            .await
2385            .unwrap();
2386
2387        // Mid-history orphan: assistant with ToolUse but NO following ToolResult user message.
2388        // This models the compaction-split scenario (RC2) where replace_conversation hid the
2389        // user[ToolResult] but left the assistant[ToolUse] visible.
2390        let use_parts = serde_json::to_string(&[
2391            MessagePart::ToolUse {
2392                id: "call_mid_1".to_string(),
2393                name: "shell".to_string(),
2394                input: serde_json::json!({"command": "ls"}),
2395            },
2396            MessagePart::Text {
2397                text: "Let me check the files.".to_string(),
2398            },
2399        ])
2400        .unwrap();
2401        sqlite
2402            .save_message_with_parts(cid, "assistant", "Let me check the files.", &use_parts)
2403            .await
2404            .unwrap();
2405
2406        // Another normal exchange after the orphan.
2407        sqlite
2408            .save_message(cid, "user", "second question")
2409            .await
2410            .unwrap();
2411        sqlite
2412            .save_message(cid, "assistant", "second answer")
2413            .await
2414            .unwrap();
2415
2416        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2417            std::sync::Arc::new(memory),
2418            cid,
2419            50,
2420            5,
2421            100,
2422        );
2423
2424        let messages_before = agent.msg.messages.len();
2425        agent.load_history().await.unwrap();
2426
2427        // All 5 messages remain (orphan message kept because it has text), but the orphaned
2428        // message must have its ToolUse parts stripped.
2429        assert_eq!(
2430            agent.msg.messages.len(),
2431            messages_before + 5,
2432            "message count must be 5 (orphan message kept — has text content)"
2433        );
2434
2435        // The orphaned assistant message (index 2 in the loaded slice) must have no ToolUse parts.
2436        let orphan = &agent.msg.messages[messages_before + 2];
2437        assert_eq!(orphan.role, Role::Assistant);
2438        assert!(
2439            !orphan
2440                .parts
2441                .iter()
2442                .any(|p| matches!(p, MessagePart::ToolUse { .. })),
2443            "orphaned ToolUse parts must be stripped from mid-history message"
2444        );
2445        // Text part must be preserved.
2446        assert!(
2447            orphan.parts.iter().any(
2448                |p| matches!(p, MessagePart::Text { text } if text == "Let me check the files.")
2449            ),
2450            "text content of orphaned assistant message must be preserved"
2451        );
2452    }
2453
2454    /// RC3 regression: a user message with empty `content` but non-empty `parts` (`ToolResult`)
2455    /// must NOT be skipped by `load_history`. Previously the empty-content check dropped these
2456    /// messages before `sanitize_tool_pairs` ran, leaving the preceding assistant `ToolUse`
2457    /// orphaned.
2458    #[tokio::test]
2459    async fn load_history_keeps_tool_only_user_message() {
2460        use zeph_llm::provider::MessagePart;
2461
2462        let provider = mock_provider(vec![]);
2463        let channel = MockChannel::new(vec![]);
2464        let registry = create_test_registry();
2465        let executor = MockToolExecutor::no_tools();
2466
2467        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2468        let cid = memory.sqlite().create_conversation().await.unwrap();
2469        let sqlite = memory.sqlite();
2470
2471        // Assistant sends a ToolUse.
2472        let use_parts = serde_json::to_string(&[MessagePart::ToolUse {
2473            id: "call_rc3".to_string(),
2474            name: "memory_save".to_string(),
2475            input: serde_json::json!({"content": "something"}),
2476        }])
2477        .unwrap();
2478        sqlite
2479            .save_message_with_parts(cid, "assistant", "[tool_use: memory_save]", &use_parts)
2480            .await
2481            .unwrap();
2482
2483        // User message has empty text content but carries ToolResult in parts — native tool pattern.
2484        let result_parts = serde_json::to_string(&[MessagePart::ToolResult {
2485            tool_use_id: "call_rc3".to_string(),
2486            content: "saved".to_string(),
2487            is_error: false,
2488        }])
2489        .unwrap();
2490        sqlite
2491            .save_message_with_parts(cid, "user", "", &result_parts)
2492            .await
2493            .unwrap();
2494
2495        sqlite.save_message(cid, "assistant", "done").await.unwrap();
2496
2497        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2498            std::sync::Arc::new(memory),
2499            cid,
2500            50,
2501            5,
2502            100,
2503        );
2504
2505        let messages_before = agent.msg.messages.len();
2506        agent.load_history().await.unwrap();
2507
2508        // All 3 messages must be loaded — the empty-content ToolResult user message must NOT be
2509        // dropped.
2510        assert_eq!(
2511            agent.msg.messages.len(),
2512            messages_before + 3,
2513            "user message with empty content but ToolResult parts must not be dropped"
2514        );
2515
2516        // The user message at index 1 must still carry the ToolResult part.
2517        let user_msg = &agent.msg.messages[messages_before + 1];
2518        assert_eq!(user_msg.role, Role::User);
2519        assert!(
2520            user_msg.parts.iter().any(
2521                |p| matches!(p, MessagePart::ToolResult { tool_use_id, .. } if tool_use_id == "call_rc3")
2522            ),
2523            "ToolResult part must be preserved on user message with empty content"
2524        );
2525    }
2526
2527    /// RC2 reverse pass: a user message with a `ToolResult` whose `tool_use_id` has no matching
2528    /// `ToolUse` in the preceding assistant message must be stripped by
2529    /// `strip_mid_history_orphans`.
2530    #[tokio::test]
2531    async fn strip_orphans_removes_orphaned_tool_result() {
2532        use zeph_llm::provider::MessagePart;
2533
2534        let provider = mock_provider(vec![]);
2535        let channel = MockChannel::new(vec![]);
2536        let registry = create_test_registry();
2537        let executor = MockToolExecutor::no_tools();
2538
2539        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2540        let cid = memory.sqlite().create_conversation().await.unwrap();
2541        let sqlite = memory.sqlite();
2542
2543        // Normal exchange before the orphan.
2544        sqlite.save_message(cid, "user", "hello").await.unwrap();
2545        sqlite.save_message(cid, "assistant", "hi").await.unwrap();
2546
2547        // Assistant message that does NOT contain a ToolUse.
2548        sqlite
2549            .save_message(cid, "assistant", "plain answer")
2550            .await
2551            .unwrap();
2552
2553        // User message references a tool_use_id that was never sent by the preceding assistant.
2554        let orphan_result_parts = serde_json::to_string(&[MessagePart::ToolResult {
2555            tool_use_id: "call_nonexistent".to_string(),
2556            content: "stale result".to_string(),
2557            is_error: false,
2558        }])
2559        .unwrap();
2560        sqlite
2561            .save_message_with_parts(
2562                cid,
2563                "user",
2564                "[tool_result: call_nonexistent]\nstale result",
2565                &orphan_result_parts,
2566            )
2567            .await
2568            .unwrap();
2569
2570        sqlite
2571            .save_message(cid, "assistant", "final")
2572            .await
2573            .unwrap();
2574
2575        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2576            std::sync::Arc::new(memory),
2577            cid,
2578            50,
2579            5,
2580            100,
2581        );
2582
2583        let messages_before = agent.msg.messages.len();
2584        agent.load_history().await.unwrap();
2585
2586        // The orphaned ToolResult part must have been stripped from the user message.
2587        // The user message itself may be removed (parts empty + content non-empty) or kept with
2588        // the text content — but it must NOT retain the orphaned ToolResult part.
2589        let loaded = &agent.msg.messages[messages_before..];
2590        for msg in loaded {
2591            assert!(
2592                !msg.parts.iter().any(|p| matches!(
2593                    p,
2594                    MessagePart::ToolResult { tool_use_id, .. } if tool_use_id == "call_nonexistent"
2595                )),
2596                "orphaned ToolResult part must be stripped from history"
2597            );
2598        }
2599    }
2600
2601    /// RC2 reverse pass: a complete `tool_use` + `tool_result` pair must pass through the reverse
2602    /// orphan check intact; the fix must not strip valid `ToolResult` parts.
2603    #[tokio::test]
2604    async fn strip_orphans_keeps_complete_pair() {
2605        use zeph_llm::provider::MessagePart;
2606
2607        let provider = mock_provider(vec![]);
2608        let channel = MockChannel::new(vec![]);
2609        let registry = create_test_registry();
2610        let executor = MockToolExecutor::no_tools();
2611
2612        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2613        let cid = memory.sqlite().create_conversation().await.unwrap();
2614        let sqlite = memory.sqlite();
2615
2616        let use_parts = serde_json::to_string(&[MessagePart::ToolUse {
2617            id: "call_valid".to_string(),
2618            name: "shell".to_string(),
2619            input: serde_json::json!({"command": "ls"}),
2620        }])
2621        .unwrap();
2622        sqlite
2623            .save_message_with_parts(cid, "assistant", "[tool_use: shell]", &use_parts)
2624            .await
2625            .unwrap();
2626
2627        let result_parts = serde_json::to_string(&[MessagePart::ToolResult {
2628            tool_use_id: "call_valid".to_string(),
2629            content: "file.rs".to_string(),
2630            is_error: false,
2631        }])
2632        .unwrap();
2633        sqlite
2634            .save_message_with_parts(cid, "user", "", &result_parts)
2635            .await
2636            .unwrap();
2637
2638        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2639            std::sync::Arc::new(memory),
2640            cid,
2641            50,
2642            5,
2643            100,
2644        );
2645
2646        let messages_before = agent.msg.messages.len();
2647        agent.load_history().await.unwrap();
2648
2649        assert_eq!(
2650            agent.msg.messages.len(),
2651            messages_before + 2,
2652            "complete tool_use/tool_result pair must be preserved"
2653        );
2654
2655        let user_msg = &agent.msg.messages[messages_before + 1];
2656        assert!(
2657            user_msg.parts.iter().any(|p| matches!(
2658                p,
2659                MessagePart::ToolResult { tool_use_id, .. } if tool_use_id == "call_valid"
2660            )),
2661            "ToolResult part for a matched tool_use must not be stripped"
2662        );
2663    }
2664
2665    /// RC2 reverse pass: history with a mix of complete pairs and orphaned `ToolResult` messages.
2666    /// Orphaned `ToolResult` parts must be stripped; complete pairs must pass through intact.
2667    #[tokio::test]
2668    async fn strip_orphans_mixed_history() {
2669        use zeph_llm::provider::MessagePart;
2670
2671        let provider = mock_provider(vec![]);
2672        let channel = MockChannel::new(vec![]);
2673        let registry = create_test_registry();
2674        let executor = MockToolExecutor::no_tools();
2675
2676        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2677        let cid = memory.sqlite().create_conversation().await.unwrap();
2678        let sqlite = memory.sqlite();
2679
2680        // First: complete tool_use / tool_result pair.
2681        let use_parts_ok = serde_json::to_string(&[MessagePart::ToolUse {
2682            id: "call_good".to_string(),
2683            name: "shell".to_string(),
2684            input: serde_json::json!({"command": "pwd"}),
2685        }])
2686        .unwrap();
2687        sqlite
2688            .save_message_with_parts(cid, "assistant", "[tool_use: shell]", &use_parts_ok)
2689            .await
2690            .unwrap();
2691
2692        let result_parts_ok = serde_json::to_string(&[MessagePart::ToolResult {
2693            tool_use_id: "call_good".to_string(),
2694            content: "/home".to_string(),
2695            is_error: false,
2696        }])
2697        .unwrap();
2698        sqlite
2699            .save_message_with_parts(cid, "user", "", &result_parts_ok)
2700            .await
2701            .unwrap();
2702
2703        // Second: plain assistant message followed by an orphaned ToolResult user message.
2704        sqlite
2705            .save_message(cid, "assistant", "text only")
2706            .await
2707            .unwrap();
2708
2709        let orphan_parts = serde_json::to_string(&[MessagePart::ToolResult {
2710            tool_use_id: "call_ghost".to_string(),
2711            content: "ghost result".to_string(),
2712            is_error: false,
2713        }])
2714        .unwrap();
2715        sqlite
2716            .save_message_with_parts(
2717                cid,
2718                "user",
2719                "[tool_result: call_ghost]\nghost result",
2720                &orphan_parts,
2721            )
2722            .await
2723            .unwrap();
2724
2725        sqlite
2726            .save_message(cid, "assistant", "final reply")
2727            .await
2728            .unwrap();
2729
2730        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2731            std::sync::Arc::new(memory),
2732            cid,
2733            50,
2734            5,
2735            100,
2736        );
2737
2738        let messages_before = agent.msg.messages.len();
2739        agent.load_history().await.unwrap();
2740
2741        let loaded = &agent.msg.messages[messages_before..];
2742
2743        // The orphaned ToolResult part must not appear in any message.
2744        for msg in loaded {
2745            assert!(
2746                !msg.parts.iter().any(|p| matches!(
2747                    p,
2748                    MessagePart::ToolResult { tool_use_id, .. } if tool_use_id == "call_ghost"
2749                )),
2750                "orphaned ToolResult (call_ghost) must be stripped from history"
2751            );
2752        }
2753
2754        // The matched ToolResult must still be present on the user message following the
2755        // first assistant message.
2756        let has_good_result = loaded.iter().any(|msg| {
2757            msg.role == Role::User
2758                && msg.parts.iter().any(|p| {
2759                    matches!(
2760                        p,
2761                        MessagePart::ToolResult { tool_use_id, .. } if tool_use_id == "call_good"
2762                    )
2763                })
2764        });
2765        assert!(
2766            has_good_result,
2767            "matched ToolResult (call_good) must be preserved in history"
2768        );
2769    }
2770
2771    /// Regression: a properly matched `tool_use`/`tool_result` pair must NOT be touched by the
2772    /// mid-history scan — ensures the fix doesn't break valid tool exchanges.
2773    #[tokio::test]
2774    async fn sanitize_tool_pairs_preserves_matched_tool_pair() {
2775        use zeph_llm::provider::MessagePart;
2776
2777        let provider = mock_provider(vec![]);
2778        let channel = MockChannel::new(vec![]);
2779        let registry = create_test_registry();
2780        let executor = MockToolExecutor::no_tools();
2781
2782        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2783        let cid = memory.sqlite().create_conversation().await.unwrap();
2784        let sqlite = memory.sqlite();
2785
2786        sqlite
2787            .save_message(cid, "user", "run a command")
2788            .await
2789            .unwrap();
2790
2791        // Assistant sends a ToolUse.
2792        let use_parts = serde_json::to_string(&[MessagePart::ToolUse {
2793            id: "call_ok".to_string(),
2794            name: "shell".to_string(),
2795            input: serde_json::json!({"command": "echo hi"}),
2796        }])
2797        .unwrap();
2798        sqlite
2799            .save_message_with_parts(cid, "assistant", "[tool_use: shell]", &use_parts)
2800            .await
2801            .unwrap();
2802
2803        // Matching user ToolResult follows.
2804        let result_parts = serde_json::to_string(&[MessagePart::ToolResult {
2805            tool_use_id: "call_ok".to_string(),
2806            content: "hi".to_string(),
2807            is_error: false,
2808        }])
2809        .unwrap();
2810        sqlite
2811            .save_message_with_parts(cid, "user", "[tool_result: call_ok]\nhi", &result_parts)
2812            .await
2813            .unwrap();
2814
2815        sqlite.save_message(cid, "assistant", "done").await.unwrap();
2816
2817        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2818            std::sync::Arc::new(memory),
2819            cid,
2820            50,
2821            5,
2822            100,
2823        );
2824
2825        let messages_before = agent.msg.messages.len();
2826        agent.load_history().await.unwrap();
2827
2828        // All 4 messages preserved, tool_use parts intact.
2829        assert_eq!(
2830            agent.msg.messages.len(),
2831            messages_before + 4,
2832            "matched tool pair must not be removed"
2833        );
2834        let tool_msg = &agent.msg.messages[messages_before + 1];
2835        assert!(
2836            tool_msg
2837                .parts
2838                .iter()
2839                .any(|p| matches!(p, MessagePart::ToolUse { id, .. } if id == "call_ok")),
2840            "matched ToolUse parts must be preserved"
2841        );
2842    }
2843
2844    /// RC5: `persist_cancelled_tool_results` must persist a tombstone user message containing
2845    /// `is_error=true` `ToolResult` parts for all `tool_calls` IDs so the preceding assistant
2846    /// `ToolUse` is never orphaned in the DB after a cancellation.
2847    #[tokio::test]
2848    async fn persist_cancelled_tool_results_pairs_tool_use() {
2849        use zeph_llm::provider::MessagePart;
2850
2851        let provider = mock_provider(vec![]);
2852        let channel = MockChannel::new(vec![]);
2853        let registry = create_test_registry();
2854        let executor = MockToolExecutor::no_tools();
2855
2856        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2857        let cid = memory.sqlite().create_conversation().await.unwrap();
2858
2859        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2860            std::sync::Arc::new(memory),
2861            cid,
2862            50,
2863            5,
2864            100,
2865        );
2866
2867        // Simulate: assistant message with two ToolUse parts already persisted.
2868        let tool_calls = vec![
2869            zeph_llm::provider::ToolUseRequest {
2870                id: "cancel_id_1".to_string(),
2871                name: "shell".to_string().into(),
2872                input: serde_json::json!({}),
2873            },
2874            zeph_llm::provider::ToolUseRequest {
2875                id: "cancel_id_2".to_string(),
2876                name: "read_file".to_string().into(),
2877                input: serde_json::json!({}),
2878            },
2879        ];
2880
2881        agent.persist_cancelled_tool_results(&tool_calls).await;
2882
2883        let history = agent
2884            .services
2885            .memory
2886            .persistence
2887            .memory
2888            .as_ref()
2889            .unwrap()
2890            .sqlite()
2891            .load_history(cid, 50)
2892            .await
2893            .unwrap();
2894
2895        // Exactly one user message must have been persisted.
2896        assert_eq!(history.len(), 1);
2897        assert_eq!(history[0].role, Role::User);
2898
2899        // It must contain is_error=true ToolResult for each tool call ID.
2900        for tc in &tool_calls {
2901            assert!(
2902                history[0].parts.iter().any(|p| matches!(
2903                    p,
2904                    MessagePart::ToolResult { tool_use_id, is_error, .. }
2905                        if tool_use_id == &tc.id && *is_error
2906                )),
2907                "tombstone ToolResult for {} must be present and is_error=true",
2908                tc.id
2909            );
2910        }
2911    }
2912
2913    // ---- Integration tests for the #2529 fix: soft-delete of legacy-content orphans ----
2914
2915    /// #2529 regression: orphaned assistant `ToolUse` + user `ToolResult` pair where BOTH messages
2916    /// have content consisting solely of legacy tool bracket strings (no human-readable text).
2917    ///
2918    /// Before the fix, `content.trim().is_empty()` returned false for these messages, so they
2919    /// were never soft-deleted and the WARN log repeated on every session restart.
2920    ///
2921    /// After the fix, `has_meaningful_content` returns false for legacy-only content, so both
2922    /// orphaned messages are soft-deleted (non-null `deleted_at`) in `SQLite`.
2923    #[tokio::test]
2924    async fn issue_2529_orphaned_legacy_content_pair_is_soft_deleted() {
2925        use zeph_llm::provider::MessagePart;
2926
2927        let provider = mock_provider(vec![]);
2928        let channel = MockChannel::new(vec![]);
2929        let registry = create_test_registry();
2930        let executor = MockToolExecutor::no_tools();
2931
2932        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2933        let cid = memory.sqlite().create_conversation().await.unwrap();
2934        let sqlite = memory.sqlite();
2935
2936        // A normal user message that anchors the conversation.
2937        sqlite
2938            .save_message(cid, "user", "save this for me")
2939            .await
2940            .unwrap();
2941
2942        // Orphaned assistant[ToolUse]: content is ONLY a legacy tool tag — no matching
2943        // ToolResult follows. This is the exact pattern that triggered #2529.
2944        let use_parts = serde_json::to_string(&[MessagePart::ToolUse {
2945            id: "call_2529".to_string(),
2946            name: "memory_save".to_string(),
2947            input: serde_json::json!({"content": "save this"}),
2948        }])
2949        .unwrap();
2950        let orphan_assistant_id = sqlite
2951            .save_message_with_parts(
2952                cid,
2953                "assistant",
2954                "[tool_use: memory_save(call_2529)]",
2955                &use_parts,
2956            )
2957            .await
2958            .unwrap();
2959
2960        // Orphaned user[ToolResult]: content is ONLY a legacy tool tag + body.
2961        // Its tool_use_id ("call_2529") does not match any ToolUse in the preceding assistant
2962        // message in this position (will be made orphaned by inserting a plain assistant message
2963        // between them to break the pair).
2964        sqlite
2965            .save_message(cid, "assistant", "here is a plain reply")
2966            .await
2967            .unwrap();
2968
2969        let result_parts = serde_json::to_string(&[MessagePart::ToolResult {
2970            tool_use_id: "call_2529".to_string(),
2971            content: "saved".to_string(),
2972            is_error: false,
2973        }])
2974        .unwrap();
2975        let orphan_user_id = sqlite
2976            .save_message_with_parts(
2977                cid,
2978                "user",
2979                "[tool_result: call_2529]\nsaved",
2980                &result_parts,
2981            )
2982            .await
2983            .unwrap();
2984
2985        // Final plain message so history doesn't end on the orphan.
2986        sqlite.save_message(cid, "assistant", "done").await.unwrap();
2987
2988        let memory_arc = std::sync::Arc::new(memory);
2989        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2990            memory_arc.clone(),
2991            cid,
2992            50,
2993            5,
2994            100,
2995        );
2996
2997        agent.load_history().await.unwrap();
2998
2999        // Verify that both orphaned messages now have `deleted_at IS NOT NULL` in SQLite.
3000        // COUNT(*) WHERE deleted_at IS NOT NULL returns 1 if soft-deleted, 0 otherwise.
3001        let assistant_deleted_count: Vec<i64> = zeph_db::query_scalar(
3002            "SELECT COUNT(*) FROM messages WHERE id = ? AND deleted_at IS NOT NULL",
3003        )
3004        .bind(orphan_assistant_id)
3005        .fetch_all(memory_arc.sqlite().pool())
3006        .await
3007        .unwrap();
3008
3009        let user_deleted_count: Vec<i64> = zeph_db::query_scalar(
3010            "SELECT COUNT(*) FROM messages WHERE id = ? AND deleted_at IS NOT NULL",
3011        )
3012        .bind(orphan_user_id)
3013        .fetch_all(memory_arc.sqlite().pool())
3014        .await
3015        .unwrap();
3016
3017        assert_eq!(
3018            assistant_deleted_count.first().copied().unwrap_or(0),
3019            1,
3020            "orphaned assistant[ToolUse] with legacy-only content must be soft-deleted (deleted_at IS NOT NULL)"
3021        );
3022        assert_eq!(
3023            user_deleted_count.first().copied().unwrap_or(0),
3024            1,
3025            "orphaned user[ToolResult] with legacy-only content must be soft-deleted (deleted_at IS NOT NULL)"
3026        );
3027    }
3028
3029    /// #2529 idempotency: after soft-delete on first `load_history`, a second call must not
3030    /// re-load the soft-deleted orphans. This ensures the WARN log does not repeat on the
3031    /// next session startup for the same orphaned messages.
3032    #[tokio::test]
3033    async fn issue_2529_soft_delete_is_idempotent_across_sessions() {
3034        use zeph_llm::provider::MessagePart;
3035
3036        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
3037        let cid = memory.sqlite().create_conversation().await.unwrap();
3038        let sqlite = memory.sqlite();
3039
3040        // Normal anchor message.
3041        sqlite
3042            .save_message(cid, "user", "do something")
3043            .await
3044            .unwrap();
3045
3046        // Orphaned assistant[ToolUse] with legacy-only content.
3047        let use_parts = serde_json::to_string(&[MessagePart::ToolUse {
3048            id: "call_idem".to_string(),
3049            name: "shell".to_string(),
3050            input: serde_json::json!({"command": "ls"}),
3051        }])
3052        .unwrap();
3053        sqlite
3054            .save_message_with_parts(cid, "assistant", "[tool_use: shell(call_idem)]", &use_parts)
3055            .await
3056            .unwrap();
3057
3058        // Break the pair: insert a plain assistant message so the ToolUse is mid-history orphan.
3059        sqlite
3060            .save_message(cid, "assistant", "continuing")
3061            .await
3062            .unwrap();
3063
3064        // Orphaned user[ToolResult] with legacy-only content.
3065        let result_parts = serde_json::to_string(&[MessagePart::ToolResult {
3066            tool_use_id: "call_idem".to_string(),
3067            content: "output".to_string(),
3068            is_error: false,
3069        }])
3070        .unwrap();
3071        sqlite
3072            .save_message_with_parts(
3073                cid,
3074                "user",
3075                "[tool_result: call_idem]\noutput",
3076                &result_parts,
3077            )
3078            .await
3079            .unwrap();
3080
3081        sqlite
3082            .save_message(cid, "assistant", "final")
3083            .await
3084            .unwrap();
3085
3086        let memory_arc = std::sync::Arc::new(memory);
3087
3088        // First session: load_history performs soft-delete of the orphaned pair.
3089        let mut agent1 = Agent::new(
3090            mock_provider(vec![]),
3091            MockChannel::new(vec![]),
3092            create_test_registry(),
3093            None,
3094            5,
3095            MockToolExecutor::no_tools(),
3096        )
3097        .with_memory(memory_arc.clone(), cid, 50, 5, 100);
3098        agent1.load_history().await.unwrap();
3099        let count_after_first = agent1.msg.messages.len();
3100
3101        // Second session: a fresh agent loading the same conversation must not see the
3102        // soft-deleted orphans — the WARN log must not repeat.
3103        let mut agent2 = Agent::new(
3104            mock_provider(vec![]),
3105            MockChannel::new(vec![]),
3106            create_test_registry(),
3107            None,
3108            5,
3109            MockToolExecutor::no_tools(),
3110        )
3111        .with_memory(memory_arc.clone(), cid, 50, 5, 100);
3112        agent2.load_history().await.unwrap();
3113        let count_after_second = agent2.msg.messages.len();
3114
3115        // Both sessions must load the same number of messages — soft-deleted orphans excluded.
3116        assert_eq!(
3117            count_after_first, count_after_second,
3118            "second load_history must load the same message count as the first (soft-deleted orphans excluded)"
3119        );
3120    }
3121
3122    /// Edge case for #2529: an orphaned assistant message whose content has BOTH meaningful text
3123    /// AND a `tool_use` tag must NOT be soft-deleted. The `ToolUse` parts are stripped but the
3124    /// message is kept because it has human-readable content.
3125    #[tokio::test]
3126    async fn issue_2529_message_with_text_and_tool_tag_is_kept_after_part_strip() {
3127        use zeph_llm::provider::MessagePart;
3128
3129        let provider = mock_provider(vec![]);
3130        let channel = MockChannel::new(vec![]);
3131        let registry = create_test_registry();
3132        let executor = MockToolExecutor::no_tools();
3133
3134        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
3135        let cid = memory.sqlite().create_conversation().await.unwrap();
3136        let sqlite = memory.sqlite();
3137
3138        // Normal first exchange.
3139        sqlite
3140            .save_message(cid, "user", "check the files")
3141            .await
3142            .unwrap();
3143
3144        // Assistant message: has BOTH meaningful text AND a ToolUse part.
3145        // Content contains real prose + legacy tag — has_meaningful_content must return true.
3146        let use_parts = serde_json::to_string(&[MessagePart::ToolUse {
3147            id: "call_mixed".to_string(),
3148            name: "shell".to_string(),
3149            input: serde_json::json!({"command": "ls"}),
3150        }])
3151        .unwrap();
3152        sqlite
3153            .save_message_with_parts(
3154                cid,
3155                "assistant",
3156                "Let me list the directory. [tool_use: shell(call_mixed)]",
3157                &use_parts,
3158            )
3159            .await
3160            .unwrap();
3161
3162        // No matching ToolResult follows — the ToolUse is orphaned.
3163        sqlite.save_message(cid, "user", "thanks").await.unwrap();
3164        sqlite
3165            .save_message(cid, "assistant", "you are welcome")
3166            .await
3167            .unwrap();
3168
3169        let memory_arc = std::sync::Arc::new(memory);
3170        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
3171            memory_arc.clone(),
3172            cid,
3173            50,
3174            5,
3175            100,
3176        );
3177
3178        let messages_before = agent.msg.messages.len();
3179        agent.load_history().await.unwrap();
3180
3181        // All 4 messages must be present — the mixed-content assistant message is KEPT.
3182        assert_eq!(
3183            agent.msg.messages.len(),
3184            messages_before + 4,
3185            "assistant message with text + tool tag must not be removed after ToolUse strip"
3186        );
3187
3188        // The mixed-content assistant message must have its ToolUse parts stripped.
3189        let mixed_msg = agent
3190            .msg
3191            .messages
3192            .iter()
3193            .find(|m| m.content.contains("Let me list the directory"))
3194            .expect("mixed-content assistant message must still be in history");
3195        assert!(
3196            !mixed_msg
3197                .parts
3198                .iter()
3199                .any(|p| matches!(p, MessagePart::ToolUse { .. })),
3200            "orphaned ToolUse parts must be stripped even when message has meaningful text"
3201        );
3202        assert_eq!(
3203            mixed_msg.content, "Let me list the directory. [tool_use: shell(call_mixed)]",
3204            "content field must be unchanged — only parts are stripped"
3205        );
3206    }
3207
3208    // ── [skipped]/[stopped] ToolResult embedding guard (#2620) ──────────────
3209
3210    #[tokio::test]
3211    async fn persist_message_skipped_tool_result_does_not_embed() {
3212        use zeph_llm::provider::MessagePart;
3213
3214        let provider = mock_provider(vec![]);
3215        let channel = MockChannel::new(vec![]);
3216        let registry = create_test_registry();
3217        let executor = MockToolExecutor::no_tools();
3218
3219        let (tx, rx) = tokio::sync::watch::channel(MetricsSnapshot::default());
3220        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
3221        let cid = memory.sqlite().create_conversation().await.unwrap();
3222
3223        let mut agent = Agent::new(provider, channel, registry, None, 5, executor)
3224            .with_metrics(tx)
3225            .with_memory(std::sync::Arc::new(memory), cid, 50, 5, 100);
3226        agent.services.memory.persistence.autosave_assistant = true;
3227        agent.services.memory.persistence.autosave_min_length = 0;
3228
3229        let parts = vec![MessagePart::ToolResult {
3230            tool_use_id: "tu1".into(),
3231            content: "[skipped] bash tool was blocked by utility gate".into(),
3232            is_error: false,
3233        }];
3234
3235        agent
3236            .persist_message(
3237                Role::User,
3238                "[skipped] bash tool was blocked by utility gate",
3239                &parts,
3240                false,
3241            )
3242            .await;
3243
3244        assert_eq!(
3245            rx.borrow().embeddings_generated,
3246            0,
3247            "[skipped] ToolResult must not be embedded into Qdrant"
3248        );
3249    }
3250
3251    #[tokio::test]
3252    async fn persist_message_stopped_tool_result_does_not_embed() {
3253        use zeph_llm::provider::MessagePart;
3254
3255        let provider = mock_provider(vec![]);
3256        let channel = MockChannel::new(vec![]);
3257        let registry = create_test_registry();
3258        let executor = MockToolExecutor::no_tools();
3259
3260        let (tx, rx) = tokio::sync::watch::channel(MetricsSnapshot::default());
3261        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
3262        let cid = memory.sqlite().create_conversation().await.unwrap();
3263
3264        let mut agent = Agent::new(provider, channel, registry, None, 5, executor)
3265            .with_metrics(tx)
3266            .with_memory(std::sync::Arc::new(memory), cid, 50, 5, 100);
3267        agent.services.memory.persistence.autosave_assistant = true;
3268        agent.services.memory.persistence.autosave_min_length = 0;
3269
3270        let parts = vec![MessagePart::ToolResult {
3271            tool_use_id: "tu2".into(),
3272            content: "[stopped] execution limit reached".into(),
3273            is_error: false,
3274        }];
3275
3276        agent
3277            .persist_message(
3278                Role::User,
3279                "[stopped] execution limit reached",
3280                &parts,
3281                false,
3282            )
3283            .await;
3284
3285        assert_eq!(
3286            rx.borrow().embeddings_generated,
3287            0,
3288            "[stopped] ToolResult must not be embedded into Qdrant"
3289        );
3290    }
3291
3292    #[tokio::test]
3293    async fn persist_message_normal_tool_result_is_saved_not_blocked_by_guard() {
3294        // Regression: a normal ToolResult (no [skipped]/[stopped] prefix) must not be
3295        // suppressed by the utility-gate guard and must reach the save path (SQLite).
3296        use zeph_llm::provider::MessagePart;
3297
3298        let provider = mock_provider(vec![]);
3299        let channel = MockChannel::new(vec![]);
3300        let registry = create_test_registry();
3301        let executor = MockToolExecutor::no_tools();
3302
3303        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
3304        let cid = memory.sqlite().create_conversation().await.unwrap();
3305        let memory_arc = std::sync::Arc::new(memory);
3306
3307        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
3308            memory_arc.clone(),
3309            cid,
3310            50,
3311            5,
3312            100,
3313        );
3314        agent.services.memory.persistence.autosave_assistant = true;
3315        agent.services.memory.persistence.autosave_min_length = 0;
3316
3317        let content = "total 42\ndrwxr-xr-x  5 user group";
3318        let parts = vec![MessagePart::ToolResult {
3319            tool_use_id: "tu3".into(),
3320            content: content.into(),
3321            is_error: false,
3322        }];
3323
3324        agent
3325            .persist_message(Role::User, content, &parts, false)
3326            .await;
3327
3328        // Must be saved to SQLite — confirms the guard did not block this path.
3329        let history = memory_arc.sqlite().load_history(cid, 50).await.unwrap();
3330        assert_eq!(
3331            history.len(),
3332            1,
3333            "normal ToolResult must be saved to SQLite"
3334        );
3335        assert_eq!(history[0].content, content);
3336    }
3337
3338    /// Verify that `enqueue_trajectory_extraction_task` uses a bounded tail slice instead of
3339    /// cloning the full message vec. We confirm the slice logic by checking that the
3340    /// `tail_start` calculation correctly bounds the window even with more messages than
3341    /// `max_messages`.
3342    #[test]
3343    fn trajectory_extraction_slice_bounds_messages() {
3344        // Replicate the slice logic from enqueue_trajectory_extraction_task.
3345        let max_messages: usize = 20;
3346        let total_messages = 100usize;
3347
3348        let tail_start = total_messages.saturating_sub(max_messages);
3349        let window = total_messages - tail_start;
3350
3351        assert_eq!(
3352            window, 20,
3353            "slice should contain exactly max_messages items"
3354        );
3355        assert_eq!(tail_start, 80, "slice should start at len - max_messages");
3356    }
3357
3358    #[test]
3359    fn trajectory_extraction_slice_handles_few_messages() {
3360        let max_messages: usize = 20;
3361        let total_messages = 5usize;
3362
3363        let tail_start = total_messages.saturating_sub(max_messages);
3364        let window = total_messages - tail_start;
3365
3366        assert_eq!(window, 5, "should return all messages when fewer than max");
3367        assert_eq!(tail_start, 0, "slice should start from the beginning");
3368    }
3369
3370    // --- #3168 regression tests ---
3371
3372    /// Round-trip: persist `Assistant[tool_use]` + `User[tool_result]`, then `load_history`.
3373    /// After `sanitize_tool_pairs` the pair must be intact — no WARN, both messages present
3374    /// with non-empty parts.
3375    #[tokio::test]
3376    async fn regression_3168_complete_tool_pair_survives_round_trip() {
3377        use zeph_llm::provider::MessagePart;
3378
3379        let provider = mock_provider(vec![]);
3380        let channel = MockChannel::new(vec![]);
3381        let registry = create_test_registry();
3382        let executor = MockToolExecutor::no_tools();
3383
3384        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
3385        let cid = memory.sqlite().create_conversation().await.unwrap();
3386        let sqlite = memory.sqlite();
3387
3388        let use_parts = serde_json::to_string(&[MessagePart::ToolUse {
3389            id: "r3168_call".to_string(),
3390            name: "shell".to_string(),
3391            input: serde_json::json!({"command": "echo hi"}),
3392        }])
3393        .unwrap();
3394        sqlite
3395            .save_message_with_parts(
3396                cid,
3397                "assistant",
3398                "[tool_use: shell(r3168_call)]",
3399                &use_parts,
3400            )
3401            .await
3402            .unwrap();
3403
3404        let result_parts = serde_json::to_string(&[MessagePart::ToolResult {
3405            tool_use_id: "r3168_call".to_string(),
3406            content: "[skipped]".to_string(),
3407            is_error: false,
3408        }])
3409        .unwrap();
3410        sqlite
3411            .save_message_with_parts(cid, "user", "[tool_result: r3168_call]", &result_parts)
3412            .await
3413            .unwrap();
3414
3415        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
3416            std::sync::Arc::new(memory),
3417            cid,
3418            50,
3419            5,
3420            100,
3421        );
3422
3423        let base = agent.msg.messages.len();
3424        agent.load_history().await.unwrap();
3425
3426        assert_eq!(
3427            agent.msg.messages.len(),
3428            base + 2,
3429            "both messages of the complete pair must survive load_history"
3430        );
3431
3432        let assistant_msg = agent
3433            .msg
3434            .messages
3435            .iter()
3436            .find(|m| m.role == Role::Assistant)
3437            .expect("assistant message missing after load_history");
3438        assert!(
3439            assistant_msg
3440                .parts
3441                .iter()
3442                .any(|p| matches!(p, MessagePart::ToolUse { id, .. } if id == "r3168_call")),
3443            "ToolUse part must be preserved in assistant message"
3444        );
3445
3446        let user_msg = agent
3447            .msg
3448            .messages
3449            .iter()
3450            .rev()
3451            .find(|m| m.role == Role::User)
3452            .expect("user message missing after load_history");
3453        assert!(
3454            user_msg.parts.iter().any(|p| matches!(p, MessagePart::ToolResult { tool_use_id, .. } if tool_use_id == "r3168_call")),
3455            "ToolResult part must be preserved in user message"
3456        );
3457    }
3458
3459    /// If parts serialization fails, `persist_message` must return early and not store
3460    /// a row with empty parts that would create an orphaned `tool_use` on next session load.
3461    /// We verify this by writing a row with invalid `parts_json` directly and confirming
3462    /// `load_history` skips it (empty parts row is not injected into the agent).
3463    #[tokio::test]
3464    async fn regression_3168_corrupt_parts_row_skipped_on_load() {
3465        use zeph_llm::provider::MessagePart;
3466
3467        let provider = mock_provider(vec![]);
3468        let channel = MockChannel::new(vec![]);
3469        let registry = create_test_registry();
3470        let executor = MockToolExecutor::no_tools();
3471
3472        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
3473        let cid = memory.sqlite().create_conversation().await.unwrap();
3474        let sqlite = memory.sqlite();
3475
3476        // Simulate the pre-fix bug: assistant message stored with empty parts_json "[]"
3477        // even though it should have had a ToolUse part. This is what persist_message used
3478        // to do before the early-return fix.
3479        sqlite
3480            .save_message_with_parts(cid, "assistant", "[tool_use: shell(corrupt)]", "[]")
3481            .await
3482            .unwrap();
3483
3484        // User message with the matching tool_result stored correctly.
3485        let result_parts = serde_json::to_string(&[MessagePart::ToolResult {
3486            tool_use_id: "corrupt".to_string(),
3487            content: "result".to_string(),
3488            is_error: false,
3489        }])
3490        .unwrap();
3491        sqlite
3492            .save_message_with_parts(cid, "user", "[tool_result: corrupt]", &result_parts)
3493            .await
3494            .unwrap();
3495
3496        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
3497            std::sync::Arc::new(memory),
3498            cid,
3499            50,
3500            5,
3501            100,
3502        );
3503
3504        let base = agent.msg.messages.len();
3505        agent.load_history().await.unwrap();
3506
3507        // The user ToolResult has no matching ToolUse in the assistant message (parts="[]"),
3508        // so sanitize_tool_pairs must strip the orphaned ToolResult.
3509        // Net result: only the content-only assistant message survives; user msg is removed
3510        // because after stripping its only part it becomes empty.
3511        let loaded = agent.msg.messages.len() - base;
3512        // No message injected with orphaned ToolResult parts — either stripped entirely or
3513        // the ToolResult part removed. Verify no user message with ToolResult remains.
3514        let orphan_present = agent.msg.messages.iter().any(|m| {
3515            m.role == Role::User
3516                && m.parts.iter().any(|p| {
3517                    matches!(p, MessagePart::ToolResult { tool_use_id, .. } if tool_use_id == "corrupt")
3518                })
3519        });
3520        assert!(
3521            !orphan_present,
3522            "orphaned ToolResult must not survive load_history; loaded={loaded}"
3523        );
3524    }
3525}