Skip to main content

zeph_core/agent/
persistence.rs

1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4use crate::channel::Channel;
5use zeph_agent_persistence::graph::{build_graph_extraction_config, collect_context_messages};
6use zeph_agent_persistence::{
7    MemoryPersistenceView, MetricsView, PersistMessageRequest, PersistenceService, SecurityView,
8};
9use zeph_llm::provider::{LlmProvider as _, MessagePart, Role};
10
11use super::Agent;
12
13impl<C: Channel> Agent<C> {
14    /// Load conversation history from memory and inject into messages.
15    ///
16    /// Delegates to [`PersistenceService::load_history`]. Post-load operations that touch
17    /// agent-internal singletons (session count increment, semantic fact count recompute,
18    /// token recompute) remain in this shim because they access fields outside the
19    /// borrow-lens view.
20    ///
21    /// # Errors
22    ///
23    /// Returns an error if loading history from `SQLite` fails.
24    ///
25    /// # Panics
26    ///
27    /// Does not panic. The internal `unwrap_or(0)` conversions are on fallible `i64 → usize`
28    /// casts that saturate to zero on overflow; they cannot panic.
29    pub async fn load_history(&mut self) -> Result<(), super::error::AgentError> {
30        let (Some(memory), Some(cid)) = (
31            self.services.memory.persistence.memory.as_ref(),
32            self.services.memory.persistence.conversation_id,
33        ) else {
34            return Ok(());
35        };
36
37        // Clone so we can call methods after the borrow-lens view is dropped.
38        let memory = memory.clone();
39
40        let mut unsummarized = self.services.memory.persistence.unsummarized_count;
41        // `memory_view` is not `mut` — the `&mut unsummarized` inside is established at
42        // construction and passed as `&memory_view` to load_history (shared borrow).
43        let memory_view = MemoryPersistenceView {
44            memory: Some(&memory),
45            conversation_id: self.services.memory.persistence.conversation_id,
46            autosave_assistant: self.services.memory.persistence.autosave_assistant,
47            autosave_min_length: self.services.memory.persistence.autosave_min_length,
48            unsummarized_count: &mut unsummarized,
49            goal_text: self.services.memory.extraction.goal_text.clone(),
50        };
51        let mut sqlite_delta = 0u64;
52        let mut embed_delta = 0u64;
53        let mut guard_delta = 0u64;
54        let mut metrics_view = MetricsView {
55            sqlite_message_count: &mut sqlite_delta,
56            embeddings_generated: &mut embed_delta,
57            exfiltration_memory_guards: &mut guard_delta,
58        };
59
60        let svc = PersistenceService::new();
61        let outcome = svc
62            .load_history(
63                &mut self.msg.messages,
64                &mut self.msg.last_persisted_message_id,
65                &mut self.msg.deferred_db_hide_ids,
66                &mut self.msg.deferred_db_summaries,
67                &memory_view,
68                &zeph_config::Config::default(),
69                &mut metrics_view,
70            )
71            .await
72            .map_err(|e| {
73                super::error::AgentError::Memory(zeph_memory::MemoryError::Other(e.to_string()))
74            })?;
75
76        // Write back lens-borrowed local to the field.
77        self.services.memory.persistence.unsummarized_count = unsummarized;
78
79        if outcome.messages_loaded > 0 {
80            // Increment session counts so tier promotion can track cross-session access.
81            let _ = memory
82                .sqlite()
83                .increment_session_counts_for_conversation(cid)
84                .await
85                .inspect_err(|e| {
86                    tracing::warn!(error = %e, "failed to increment tier session counts");
87                });
88        }
89
90        // Set absolute SQLite message count and semantic fact count (not deltas).
91        self.update_metrics(|m| {
92            m.sqlite_message_count = outcome.sqlite_total_messages;
93        });
94        if let Ok(count) = memory.sqlite().count_semantic_facts().await {
95            let count_u64 = u64::try_from(count).unwrap_or(0);
96            self.update_metrics(|m| {
97                m.semantic_fact_count = count_u64;
98            });
99        }
100        if let Ok(count) = memory.unsummarized_message_count(cid).await {
101            self.services.memory.persistence.unsummarized_count =
102                usize::try_from(count).unwrap_or(0);
103        }
104
105        self.recompute_prompt_tokens();
106        Ok(())
107    }
108
109    /// Persist a message to memory.
110    ///
111    /// `has_injection_flags` controls whether Qdrant embedding is skipped for this message.
112    /// When `true` and `guard_memory_writes` is enabled, only `SQLite` is written — the message
113    /// is saved for conversation continuity but will not pollute semantic search (M2, D2).
114    #[cfg_attr(
115        feature = "profiling",
116        tracing::instrument(name = "agent.persist_message", skip_all)
117    )]
118    pub(crate) async fn persist_message(
119        &mut self,
120        role: Role,
121        content: &str,
122        parts: &[MessagePart],
123        has_injection_flags: bool,
124    ) {
125        // M2: call should_guard_memory_write for its diagnostic side effects (tracing + security
126        // event). The bool result is passed into SecurityView so the service can decide whether
127        // to skip Qdrant embedding.
128        let guard_event = self
129            .services
130            .security
131            .exfiltration_guard
132            .should_guard_memory_write(has_injection_flags);
133        if let Some(ref event) = guard_event {
134            tracing::warn!(
135                ?event,
136                "exfiltration guard: skipping Qdrant embedding for flagged content"
137            );
138            self.push_security_event(
139                zeph_common::SecurityEventCategory::ExfiltrationBlock,
140                "memory_write",
141                "Qdrant embedding skipped: flagged content",
142            );
143        }
144
145        let req = PersistMessageRequest::from_borrowed(role, content, parts, has_injection_flags);
146
147        let mut unsummarized = self.services.memory.persistence.unsummarized_count;
148        let memory_arc = self.services.memory.persistence.memory.clone();
149        let mut memory_view = MemoryPersistenceView {
150            memory: memory_arc.as_ref(),
151            conversation_id: self.services.memory.persistence.conversation_id,
152            autosave_assistant: self.services.memory.persistence.autosave_assistant,
153            autosave_min_length: self.services.memory.persistence.autosave_min_length,
154            unsummarized_count: &mut unsummarized,
155            goal_text: self.services.memory.extraction.goal_text.clone(),
156        };
157        let security = SecurityView {
158            guard_memory_writes: guard_event.is_some(),
159            _phantom: std::marker::PhantomData,
160        };
161        let mut sqlite_delta = 0u64;
162        let mut embed_delta = 0u64;
163        let mut guard_delta = 0u64;
164        let mut metrics_view = MetricsView {
165            sqlite_message_count: &mut sqlite_delta,
166            embeddings_generated: &mut embed_delta,
167            exfiltration_memory_guards: &mut guard_delta,
168        };
169
170        let svc = PersistenceService::new();
171        let outcome = svc
172            .persist_message(
173                req,
174                &mut self.msg.last_persisted_message_id,
175                &mut memory_view,
176                &security,
177                &zeph_config::Config::default(),
178                &mut metrics_view,
179            )
180            .await;
181
182        // Write back the unsummarized counter (lens borrowed a local copy).
183        self.services.memory.persistence.unsummarized_count = unsummarized;
184
185        // Forward metric deltas through the watch broadcast.
186        self.update_metrics(|m| {
187            m.sqlite_message_count += sqlite_delta;
188            m.embeddings_generated += embed_delta;
189            // guard_delta is already tracked via push_security_event above.
190            m.exfiltration_memory_guards += guard_delta;
191        });
192
193        if outcome.message_id.is_none() {
194            return;
195        }
196
197        // Phase 2: enqueue enrichment tasks via supervisor (non-blocking).
198        // check_summarization signals completion via SummarizationSignal, consumed in reap()
199        // between turns — no shared mutable state across tasks (S1 fix).
200        self.enqueue_summarization_task();
201
202        // FIX-1: skip graph extraction for tool result messages — they contain raw structured
203        // output (TOML, JSON, code) that pollutes the entity graph with noise.
204        let has_tool_result_parts = parts
205            .iter()
206            .any(|p| matches!(p, MessagePart::ToolResult { .. }));
207
208        self.enqueue_graph_extraction_task(content, has_injection_flags, has_tool_result_parts)
209            .await;
210
211        // Persona extraction: run only for user messages that are not tool results and not injected.
212        if role == Role::User && !has_tool_result_parts && !has_injection_flags {
213            self.enqueue_persona_extraction_task();
214        }
215
216        // Trajectory extraction: run after turns that contained tool results.
217        if has_tool_result_parts {
218            self.enqueue_trajectory_extraction_task();
219        }
220
221        // ReasoningBank distillation: runs only after the final assistant message of a turn
222        // (C2 fix: skip intermediate tool-call messages). A message with ToolUse parts is an
223        // intermediate step; the final assistant message has no ToolUse parts.
224        // S-Med1: skip if injection patterns detected — mirrors graph extraction guard.
225        let has_tool_use_parts = parts
226            .iter()
227            .any(|p| matches!(p, MessagePart::ToolUse { .. }));
228        if role == Role::Assistant && !has_tool_use_parts && !has_injection_flags {
229            self.enqueue_reasoning_extraction_task();
230            // MemCoT distillation: same guards as ReasoningBank.
231            self.enqueue_memcot_distill_task(content);
232        }
233    }
234
235    /// Enqueue `MemCoT` semantic state distillation via the supervisor.
236    ///
237    /// All cost gates (interval, session cap, min chars) are checked inside
238    /// [`crate::agent::memcot::SemanticStateAccumulator::maybe_enqueue_distill`].
239    fn enqueue_memcot_distill_task(&mut self, assistant_content: &str) {
240        let Some(accumulator) = &self.services.memory.extraction.memcot_accumulator else {
241            return;
242        };
243        let distill_provider_name = self
244            .services
245            .memory
246            .extraction
247            .memcot_config
248            .distill_provider
249            .as_str();
250        let provider = self.resolve_background_provider(distill_provider_name);
251
252        let content = assistant_content.to_owned();
253        let supervisor = &mut self.runtime.lifecycle.supervisor;
254
255        accumulator.maybe_enqueue_distill(&content, provider, |name, fut| {
256            supervisor.spawn(super::agent_supervisor::TaskClass::Enrichment, name, fut);
257        });
258    }
259
260    /// Enqueue background summarization via the supervisor (S1 fix: no shared `AtomicUsize`).
261    fn enqueue_summarization_task(&mut self) {
262        let (Some(memory), Some(cid)) = (
263            self.services.memory.persistence.memory.clone(),
264            self.services.memory.persistence.conversation_id,
265        ) else {
266            return;
267        };
268
269        if self.services.memory.persistence.unsummarized_count
270            <= self.services.memory.compaction.summarization_threshold
271        {
272            return;
273        }
274
275        let batch_size = self.services.memory.compaction.summarization_threshold / 2;
276
277        self.runtime.lifecycle.supervisor.spawn_summarization("summarization", async move {
278            match tokio::time::timeout(
279                std::time::Duration::from_secs(30),
280                memory.summarize(cid, batch_size),
281            )
282            .await
283            {
284                Ok(Ok(Some(summary_id))) => {
285                    tracing::info!(
286                        "background summarization: created summary {summary_id} for conversation {cid}"
287                    );
288                    true
289                }
290                Ok(Ok(None)) => {
291                    tracing::debug!("background summarization: no summarization needed");
292                    false
293                }
294                Ok(Err(e)) => {
295                    tracing::error!("background summarization failed: {e:#}");
296                    false
297                }
298                Err(_) => {
299                    tracing::warn!("background summarization timed out after 30s");
300                    false
301                }
302            }
303        });
304    }
305
306    /// Prepare graph extraction guards in foreground, then enqueue heavy work via supervisor.
307    ///
308    /// Guards (enabled check, injection/tool-result skip) stay on the foreground path.
309    /// The RPE check and actual extraction run in background (S2: no `send_status`).
310    async fn enqueue_graph_extraction_task(
311        &mut self,
312        content: &str,
313        has_injection_flags: bool,
314        has_tool_result_parts: bool,
315    ) {
316        if self.services.memory.persistence.memory.is_none()
317            || self.services.memory.persistence.conversation_id.is_none()
318        {
319            return;
320        }
321        if has_tool_result_parts {
322            tracing::debug!("graph extraction skipped: message contains ToolResult parts");
323            return;
324        }
325        if has_injection_flags {
326            tracing::warn!("graph extraction skipped: injection patterns detected in content");
327            return;
328        }
329
330        let cfg = &self.services.memory.extraction.graph_config;
331        if !cfg.enabled {
332            return;
333        }
334        let extraction_cfg = build_graph_extraction_config(
335            cfg,
336            self.services
337                .memory
338                .persistence
339                .conversation_id
340                .map(|c| c.0),
341        );
342        // Resolve a clean provider that bypasses quality_gate for JSON extraction tasks.
343        // When extract_provider is empty, falls back to the primary provider (existing behavior).
344        let extract_provider_name = cfg.extract_provider.as_str().to_owned();
345
346        // RPE check: embed + compute surprise score. Stays on foreground to avoid
347        // capturing the rpe_router mutex in a background task.
348        if self.rpe_should_skip(content).await {
349            tracing::debug!("D-MEM RPE: low-surprise turn, skipping graph extraction");
350            return;
351        }
352
353        let context_messages = collect_context_messages(&self.msg.messages);
354
355        let Some(memory) = self.services.memory.persistence.memory.clone() else {
356            return;
357        };
358
359        let validator: zeph_memory::semantic::PostExtractValidator =
360            if self.services.security.memory_validator.is_enabled() {
361                let v = self.services.security.memory_validator.clone();
362                Some(Box::new(move |result| {
363                    v.validate_graph_extraction(result)
364                        .map_err(|e| e.to_string())
365                }))
366            } else {
367                None
368            };
369
370        let provider_override = if extract_provider_name.is_empty() {
371            None
372        } else {
373            Some(self.resolve_background_provider(&extract_provider_name))
374        };
375
376        self.spawn_graph_extraction_task(
377            memory,
378            content,
379            context_messages,
380            extraction_cfg,
381            validator,
382            provider_override,
383        );
384
385        // Sync community failures and extraction metrics (cheap, foreground-safe).
386        self.sync_community_detection_failures();
387        self.sync_graph_extraction_metrics();
388        self.enqueue_graph_count_sync_task();
389    }
390
391    fn spawn_graph_extraction_task(
392        &mut self,
393        memory: std::sync::Arc<zeph_memory::semantic::SemanticMemory>,
394        content: &str,
395        context_messages: Vec<String>,
396        extraction_cfg: zeph_memory::semantic::GraphExtractionConfig,
397        validator: zeph_memory::semantic::PostExtractValidator,
398        provider_override: Option<zeph_llm::any::AnyProvider>,
399    ) {
400        let content_owned = content.to_owned();
401        let graph_store = memory.graph_store.clone();
402        let metrics_tx = self.runtime.metrics.metrics_tx.clone();
403        let start_time = self.runtime.lifecycle.start_time;
404
405        self.runtime.lifecycle.supervisor.spawn(
406            super::agent_supervisor::TaskClass::Enrichment,
407            "graph_extraction",
408            async move {
409                let extraction_handle = memory.spawn_graph_extraction(
410                    content_owned,
411                    context_messages,
412                    extraction_cfg,
413                    validator,
414                    provider_override,
415                );
416
417                // After extraction completes, refresh graph count metrics.
418                if let (Some(store), Some(tx)) = (graph_store, metrics_tx) {
419                    let _ = extraction_handle.await;
420                    let (entities, edges, communities) = tokio::join!(
421                        store.entity_count(),
422                        store.active_edge_count(),
423                        store.community_count()
424                    );
425                    let elapsed = start_time.elapsed().as_secs();
426                    tx.send_modify(|m| {
427                        m.uptime_seconds = elapsed;
428                        m.graph_entities_total = entities.unwrap_or(0).cast_unsigned();
429                        m.graph_edges_total = edges.unwrap_or(0).cast_unsigned();
430                        m.graph_communities_total = communities.unwrap_or(0).cast_unsigned();
431                    });
432                } else {
433                    let _ = extraction_handle.await;
434                }
435
436                tracing::debug!("background graph extraction complete");
437            },
438        );
439    }
440
441    // sync_graph_counts and sync_guidelines_status are DB reads; enqueued as Telemetry background.
442    fn enqueue_graph_count_sync_task(&mut self) {
443        let memory_for_sync = self.services.memory.persistence.memory.clone();
444        let metrics_tx_sync = self.runtime.metrics.metrics_tx.clone();
445        let start_time_sync = self.runtime.lifecycle.start_time;
446        let cid_sync = self.services.memory.persistence.conversation_id;
447        let graph_store_sync = memory_for_sync.as_ref().and_then(|m| m.graph_store.clone());
448        let sqlite_sync = memory_for_sync.as_ref().map(|m| m.sqlite().clone());
449        let guidelines_enabled = self.services.memory.extraction.graph_config.enabled;
450
451        self.runtime.lifecycle.supervisor.spawn(
452            super::agent_supervisor::TaskClass::Telemetry,
453            "graph_count_sync",
454            async move {
455                let Some(store) = graph_store_sync else {
456                    return;
457                };
458                let Some(tx) = metrics_tx_sync else { return };
459
460                let (entities, edges, communities) = tokio::join!(
461                    store.entity_count(),
462                    store.active_edge_count(),
463                    store.community_count()
464                );
465                let elapsed = start_time_sync.elapsed().as_secs();
466                tx.send_modify(|m| {
467                    m.uptime_seconds = elapsed;
468                    m.graph_entities_total = entities.unwrap_or(0).cast_unsigned();
469                    m.graph_edges_total = edges.unwrap_or(0).cast_unsigned();
470                    m.graph_communities_total = communities.unwrap_or(0).cast_unsigned();
471                });
472
473                // Sync guidelines status.
474                if guidelines_enabled && let Some(sqlite) = sqlite_sync {
475                    match tokio::time::timeout(
476                        std::time::Duration::from_secs(10),
477                        sqlite.load_compression_guidelines_meta(cid_sync),
478                    )
479                    .await
480                    {
481                        Ok(Ok((version, created_at))) => {
482                            #[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
483                            let version_u32 = u32::try_from(version).unwrap_or(0);
484                            tx.send_modify(|m| {
485                                m.guidelines_version = version_u32;
486                                m.guidelines_updated_at = created_at;
487                            });
488                        }
489                        Ok(Err(e)) => {
490                            tracing::debug!("guidelines status sync failed: {e:#}");
491                        }
492                        Err(_) => {
493                            tracing::debug!("guidelines status sync timed out");
494                        }
495                    }
496                }
497            },
498        );
499    }
500
501    /// Enqueue persona extraction via supervisor (background, no `send_status`).
502    fn enqueue_persona_extraction_task(&mut self) {
503        use zeph_memory::semantic::{PersonaExtractionConfig, extract_persona_facts};
504
505        let cfg = &self.services.memory.extraction.persona_config;
506        if !cfg.enabled {
507            return;
508        }
509
510        let Some(memory) = &self.services.memory.persistence.memory else {
511            return;
512        };
513
514        let user_messages: Vec<String> = self
515            .msg
516            .messages
517            .iter()
518            .filter(|m| {
519                m.role == Role::User
520                    && !m
521                        .parts
522                        .iter()
523                        .any(|p| matches!(p, MessagePart::ToolResult { .. }))
524            })
525            .take(8)
526            .map(|m| {
527                if m.content.len() > 2048 {
528                    m.content[..m.content.floor_char_boundary(2048)].to_owned()
529                } else {
530                    m.content.clone()
531                }
532            })
533            .collect();
534
535        if user_messages.len() < cfg.min_messages {
536            return;
537        }
538
539        let timeout_secs = cfg.extraction_timeout_secs;
540        let extraction_cfg = PersonaExtractionConfig {
541            enabled: cfg.enabled,
542            min_messages: cfg.min_messages,
543            max_messages: cfg.max_messages,
544            extraction_timeout_secs: timeout_secs,
545        };
546
547        let provider = self.resolve_background_provider(cfg.persona_provider.as_str());
548        let store = memory.sqlite().clone();
549        let conversation_id = self
550            .services
551            .memory
552            .persistence
553            .conversation_id
554            .map(|c| c.0);
555
556        self.runtime.lifecycle.supervisor.spawn(
557            super::agent_supervisor::TaskClass::Enrichment,
558            "persona_extraction",
559            async move {
560                let user_message_refs: Vec<&str> =
561                    user_messages.iter().map(String::as_str).collect();
562                let fut = extract_persona_facts(
563                    &store,
564                    &provider,
565                    &user_message_refs,
566                    &extraction_cfg,
567                    conversation_id,
568                );
569                match tokio::time::timeout(std::time::Duration::from_secs(timeout_secs), fut).await
570                {
571                    Ok(Ok(n)) => tracing::debug!(upserted = n, "persona extraction complete"),
572                    Ok(Err(e)) => tracing::warn!(error = %e, "persona extraction failed"),
573                    Err(_) => tracing::warn!(
574                        timeout_secs,
575                        "persona extraction timed out — no facts written this turn"
576                    ),
577                }
578            },
579        );
580    }
581
582    /// Enqueue trajectory extraction via supervisor (background).
583    fn enqueue_trajectory_extraction_task(&mut self) {
584        use zeph_memory::semantic::{TrajectoryExtractionConfig, extract_trajectory_entries};
585
586        let cfg = self.services.memory.extraction.trajectory_config.clone();
587        if !cfg.enabled {
588            return;
589        }
590
591        let Some(memory) = &self.services.memory.persistence.memory else {
592            return;
593        };
594
595        let conversation_id = match self.services.memory.persistence.conversation_id {
596            Some(cid) => cid.0,
597            None => return,
598        };
599
600        let tail_start = self.msg.messages.len().saturating_sub(cfg.max_messages);
601        let turn_messages: Vec<zeph_llm::provider::Message> =
602            self.msg.messages[tail_start..].to_vec();
603
604        if turn_messages.is_empty() {
605            return;
606        }
607
608        let extraction_cfg = TrajectoryExtractionConfig {
609            enabled: cfg.enabled,
610            max_messages: cfg.max_messages,
611            extraction_timeout_secs: cfg.extraction_timeout_secs,
612        };
613
614        let provider = self.resolve_background_provider(cfg.trajectory_provider.as_str());
615        let store = memory.sqlite().clone();
616        let min_confidence = cfg.min_confidence;
617
618        self.runtime.lifecycle.supervisor.spawn(
619            super::agent_supervisor::TaskClass::Enrichment,
620            "trajectory_extraction",
621            async move {
622                let entries =
623                    match extract_trajectory_entries(&provider, &turn_messages, &extraction_cfg)
624                        .await
625                    {
626                        Ok(e) => e,
627                        Err(e) => {
628                            tracing::warn!(error = %e, "trajectory extraction failed");
629                            return;
630                        }
631                    };
632
633                let last_id = store
634                    .trajectory_last_extracted_message_id(conversation_id)
635                    .await
636                    .unwrap_or(0);
637
638                let mut max_id = last_id;
639                for entry in &entries {
640                    if entry.confidence < min_confidence {
641                        continue;
642                    }
643                    let tools_json = serde_json::to_string(&entry.tools_used)
644                        .unwrap_or_else(|_| "[]".to_string());
645                    match store
646                        .insert_trajectory_entry(zeph_memory::NewTrajectoryEntry {
647                            conversation_id: Some(conversation_id),
648                            turn_index: 0,
649                            kind: &entry.kind,
650                            intent: &entry.intent,
651                            outcome: &entry.outcome,
652                            tools_used: &tools_json,
653                            confidence: entry.confidence,
654                        })
655                        .await
656                    {
657                        Ok(id) => {
658                            if id > max_id {
659                                max_id = id;
660                            }
661                        }
662                        Err(e) => tracing::warn!(error = %e, "failed to insert trajectory entry"),
663                    }
664                }
665
666                if max_id > last_id {
667                    let _ = store
668                        .set_trajectory_last_extracted_message_id(conversation_id, max_id)
669                        .await;
670                }
671
672                tracing::debug!(
673                    count = entries.len(),
674                    conversation_id,
675                    "trajectory extraction complete"
676                );
677            },
678        );
679    }
680
681    /// Enqueue reasoning strategy distillation via supervisor (background, fire-and-forget).
682    ///
683    /// Mirrors [`Self::enqueue_trajectory_extraction_task`]. Runs after every assistant turn
684    /// when `memory.reasoning.enabled = true` and a `ReasoningMemory` is attached.
685    fn enqueue_reasoning_extraction_task(&mut self) {
686        let cfg = self.services.memory.extraction.reasoning_config.clone();
687        if !cfg.enabled {
688            return;
689        }
690
691        let Some(memory) = &self.services.memory.persistence.memory else {
692            return;
693        };
694
695        let Some(reasoning) = memory.reasoning.clone() else {
696            return;
697        };
698
699        let tail_start = self.msg.messages.len().saturating_sub(cfg.max_messages);
700        let turn_messages: Vec<zeph_llm::provider::Message> =
701            self.msg.messages[tail_start..].to_vec();
702
703        if turn_messages.len() < cfg.min_messages {
704            return;
705        }
706
707        let extract_provider = self.resolve_background_provider(cfg.extract_provider.as_str());
708        let distill_provider = self.resolve_background_provider(cfg.distill_provider.as_str());
709        let embed_provider = memory.effective_embed_provider().clone();
710        let store_limit = cfg.store_limit;
711        let extraction_timeout = std::time::Duration::from_secs(cfg.extraction_timeout_secs);
712        let distill_timeout = std::time::Duration::from_secs(cfg.distill_timeout_secs);
713        let self_judge_window = cfg.self_judge_window;
714        let min_assistant_chars = cfg.min_assistant_chars;
715
716        self.runtime.lifecycle.supervisor.spawn(
717            super::agent_supervisor::TaskClass::Enrichment,
718            "reasoning_extraction",
719            async move {
720                if let Err(e) = zeph_memory::process_reasoning_turn(
721                    &reasoning,
722                    &extract_provider,
723                    &distill_provider,
724                    &embed_provider,
725                    &turn_messages,
726                    zeph_memory::ProcessTurnConfig {
727                        store_limit,
728                        extraction_timeout,
729                        distill_timeout,
730                        self_judge_window,
731                        min_assistant_chars,
732                    },
733                )
734                .await
735                {
736                    tracing::warn!(error = %e, "reasoning: process_turn failed");
737                }
738
739                tracing::debug!("reasoning extraction complete");
740            },
741        );
742    }
743
744    /// D-MEM RPE check: returns `true` when the current turn should skip graph extraction.
745    ///
746    /// Embeds `content`, computes RPE via the router, and updates the router state.
747    /// Returns `false` (do not skip) on any error — conservative fallback.
748    async fn rpe_should_skip(&mut self, content: &str) -> bool {
749        let Some(ref rpe_mutex) = self.services.memory.extraction.rpe_router else {
750            return false;
751        };
752        let Some(memory) = &self.services.memory.persistence.memory else {
753            return false;
754        };
755        let candidates = zeph_memory::extract_candidate_entities(content);
756        let provider = memory.provider();
757        let Ok(Ok(emb_vec)) =
758            tokio::time::timeout(std::time::Duration::from_secs(5), provider.embed(content)).await
759        else {
760            return false; // embed failed/timed out → extract
761        };
762        if let Ok(mut router) = rpe_mutex.lock() {
763            let signal = router.compute(&emb_vec, &candidates);
764            router.push_embedding(emb_vec);
765            router.push_entities(&candidates);
766            !signal.should_extract
767        } else {
768            tracing::warn!("rpe_router mutex poisoned; falling through to extract");
769            false
770        }
771    }
772}
773
774#[cfg(test)]
775mod tests {
776    use super::super::agent_tests::{
777        MetricsSnapshot, MockChannel, MockToolExecutor, create_test_registry, mock_provider,
778    };
779    use super::*;
780    use zeph_llm::any::AnyProvider;
781    use zeph_llm::provider::Message;
782    use zeph_memory::semantic::SemanticMemory;
783
784    async fn test_memory(provider: &AnyProvider) -> SemanticMemory {
785        SemanticMemory::new(
786            ":memory:",
787            "http://127.0.0.1:1",
788            None,
789            provider.clone(),
790            "test-model",
791        )
792        .await
793        .unwrap()
794    }
795
796    #[tokio::test]
797    async fn load_history_without_memory_returns_ok() {
798        let provider = mock_provider(vec![]);
799        let channel = MockChannel::new(vec![]);
800        let registry = create_test_registry();
801        let executor = MockToolExecutor::no_tools();
802        let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
803
804        let result = agent.load_history().await;
805        assert!(result.is_ok());
806        // No messages added when no memory is configured
807        assert_eq!(agent.msg.messages.len(), 1); // system prompt only
808    }
809
810    #[tokio::test]
811    async fn load_history_with_messages_injects_into_agent() {
812        let provider = mock_provider(vec![]);
813        let channel = MockChannel::new(vec![]);
814        let registry = create_test_registry();
815        let executor = MockToolExecutor::no_tools();
816
817        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
818        let cid = memory.sqlite().create_conversation().await.unwrap();
819
820        memory
821            .sqlite()
822            .save_message(cid, "user", "hello from history")
823            .await
824            .unwrap();
825        memory
826            .sqlite()
827            .save_message(cid, "assistant", "hi back")
828            .await
829            .unwrap();
830
831        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
832            std::sync::Arc::new(memory),
833            cid,
834            50,
835            5,
836            100,
837        );
838
839        let messages_before = agent.msg.messages.len();
840        agent.load_history().await.unwrap();
841        // Two messages were added from history
842        assert_eq!(agent.msg.messages.len(), messages_before + 2);
843    }
844
845    #[tokio::test]
846    async fn load_history_skips_empty_messages() {
847        let provider = mock_provider(vec![]);
848        let channel = MockChannel::new(vec![]);
849        let registry = create_test_registry();
850        let executor = MockToolExecutor::no_tools();
851
852        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
853        let cid = memory.sqlite().create_conversation().await.unwrap();
854
855        // Save an empty message (should be skipped) and a valid one
856        memory
857            .sqlite()
858            .save_message(cid, "user", "   ")
859            .await
860            .unwrap();
861        memory
862            .sqlite()
863            .save_message(cid, "user", "real message")
864            .await
865            .unwrap();
866
867        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
868            std::sync::Arc::new(memory),
869            cid,
870            50,
871            5,
872            100,
873        );
874
875        let messages_before = agent.msg.messages.len();
876        agent.load_history().await.unwrap();
877        // Only the non-empty message is loaded
878        assert_eq!(agent.msg.messages.len(), messages_before + 1);
879    }
880
881    #[tokio::test]
882    async fn load_history_with_empty_store_returns_ok() {
883        let provider = mock_provider(vec![]);
884        let channel = MockChannel::new(vec![]);
885        let registry = create_test_registry();
886        let executor = MockToolExecutor::no_tools();
887
888        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
889        let cid = memory.sqlite().create_conversation().await.unwrap();
890
891        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
892            std::sync::Arc::new(memory),
893            cid,
894            50,
895            5,
896            100,
897        );
898
899        let messages_before = agent.msg.messages.len();
900        agent.load_history().await.unwrap();
901        // No messages added — empty history
902        assert_eq!(agent.msg.messages.len(), messages_before);
903    }
904
905    #[tokio::test]
906    async fn load_history_increments_session_count_for_existing_messages() {
907        let provider = mock_provider(vec![]);
908        let channel = MockChannel::new(vec![]);
909        let registry = create_test_registry();
910        let executor = MockToolExecutor::no_tools();
911
912        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
913        let cid = memory.sqlite().create_conversation().await.unwrap();
914
915        // Save two messages — they start with session_count = 0.
916        let id1 = memory
917            .sqlite()
918            .save_message(cid, "user", "hello")
919            .await
920            .unwrap();
921        let id2 = memory
922            .sqlite()
923            .save_message(cid, "assistant", "hi")
924            .await
925            .unwrap();
926
927        let memory_arc = std::sync::Arc::new(memory);
928        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
929            memory_arc.clone(),
930            cid,
931            50,
932            5,
933            100,
934        );
935
936        agent.load_history().await.unwrap();
937
938        // Both episodic messages must have session_count = 1 after restore.
939        let counts: Vec<i64> = zeph_db::query_scalar(
940            "SELECT session_count FROM messages WHERE id IN (?, ?) ORDER BY id",
941        )
942        .bind(id1)
943        .bind(id2)
944        .fetch_all(memory_arc.sqlite().pool())
945        .await
946        .unwrap();
947        assert_eq!(
948            counts,
949            vec![1, 1],
950            "session_count must be 1 after first restore"
951        );
952    }
953
954    #[tokio::test]
955    async fn load_history_does_not_increment_session_count_for_new_conversation() {
956        let provider = mock_provider(vec![]);
957        let channel = MockChannel::new(vec![]);
958        let registry = create_test_registry();
959        let executor = MockToolExecutor::no_tools();
960
961        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
962        let cid = memory.sqlite().create_conversation().await.unwrap();
963
964        // No messages saved — empty conversation.
965        let memory_arc = std::sync::Arc::new(memory);
966        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
967            memory_arc.clone(),
968            cid,
969            50,
970            5,
971            100,
972        );
973
974        agent.load_history().await.unwrap();
975
976        // No rows → no session_count increments → query returns empty.
977        let counts: Vec<i64> =
978            zeph_db::query_scalar("SELECT session_count FROM messages WHERE conversation_id = ?")
979                .bind(cid)
980                .fetch_all(memory_arc.sqlite().pool())
981                .await
982                .unwrap();
983        assert!(counts.is_empty(), "new conversation must have no messages");
984    }
985
986    #[tokio::test]
987    async fn persist_message_without_memory_silently_returns() {
988        // No memory configured — persist_message must not panic
989        let provider = mock_provider(vec![]);
990        let channel = MockChannel::new(vec![]);
991        let registry = create_test_registry();
992        let executor = MockToolExecutor::no_tools();
993        let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
994
995        // Must not panic and must complete
996        agent.persist_message(Role::User, "hello", &[], false).await;
997    }
998
999    #[tokio::test]
1000    async fn persist_message_assistant_autosave_false_uses_save_only() {
1001        let provider = mock_provider(vec![]);
1002        let channel = MockChannel::new(vec![]);
1003        let registry = create_test_registry();
1004        let executor = MockToolExecutor::no_tools();
1005
1006        let (tx, rx) = tokio::sync::watch::channel(MetricsSnapshot::default());
1007        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1008        let cid = memory.sqlite().create_conversation().await.unwrap();
1009
1010        let mut agent = Agent::new(provider, channel, registry, None, 5, executor)
1011            .with_metrics(tx)
1012            .with_memory(std::sync::Arc::new(memory), cid, 50, 5, 100);
1013        agent.services.memory.persistence.autosave_assistant = false;
1014        agent.services.memory.persistence.autosave_min_length = 20;
1015
1016        agent
1017            .persist_message(Role::Assistant, "short assistant reply", &[], false)
1018            .await;
1019
1020        let history = agent
1021            .services
1022            .memory
1023            .persistence
1024            .memory
1025            .as_ref()
1026            .unwrap()
1027            .sqlite()
1028            .load_history(cid, 50)
1029            .await
1030            .unwrap();
1031        assert_eq!(history.len(), 1, "message must be saved");
1032        assert_eq!(history[0].content, "short assistant reply");
1033        // embeddings_generated must remain 0 — save_only path does not embed
1034        assert_eq!(rx.borrow().embeddings_generated, 0);
1035    }
1036
1037    #[tokio::test]
1038    async fn persist_message_assistant_below_min_length_uses_save_only() {
1039        let provider = mock_provider(vec![]);
1040        let channel = MockChannel::new(vec![]);
1041        let registry = create_test_registry();
1042        let executor = MockToolExecutor::no_tools();
1043
1044        let (tx, rx) = tokio::sync::watch::channel(MetricsSnapshot::default());
1045        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1046        let cid = memory.sqlite().create_conversation().await.unwrap();
1047
1048        // autosave_assistant=true but min_length=1000 — short content falls back to save_only
1049        let mut agent = Agent::new(provider, channel, registry, None, 5, executor)
1050            .with_metrics(tx)
1051            .with_memory(std::sync::Arc::new(memory), cid, 50, 5, 100);
1052        agent.services.memory.persistence.autosave_assistant = true;
1053        agent.services.memory.persistence.autosave_min_length = 1000;
1054
1055        agent
1056            .persist_message(Role::Assistant, "too short", &[], false)
1057            .await;
1058
1059        let history = agent
1060            .services
1061            .memory
1062            .persistence
1063            .memory
1064            .as_ref()
1065            .unwrap()
1066            .sqlite()
1067            .load_history(cid, 50)
1068            .await
1069            .unwrap();
1070        assert_eq!(history.len(), 1, "message must be saved");
1071        assert_eq!(history[0].content, "too short");
1072        assert_eq!(rx.borrow().embeddings_generated, 0);
1073    }
1074
1075    #[tokio::test]
1076    async fn persist_message_assistant_at_min_length_boundary_uses_embed() {
1077        // content.len() == autosave_min_length → should_embed = true (>= boundary).
1078        let provider = mock_provider(vec![]);
1079        let channel = MockChannel::new(vec![]);
1080        let registry = create_test_registry();
1081        let executor = MockToolExecutor::no_tools();
1082
1083        let (tx, rx) = tokio::sync::watch::channel(MetricsSnapshot::default());
1084        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1085        let cid = memory.sqlite().create_conversation().await.unwrap();
1086
1087        let min_length = 10usize;
1088        let mut agent = Agent::new(provider, channel, registry, None, 5, executor)
1089            .with_metrics(tx)
1090            .with_memory(std::sync::Arc::new(memory), cid, 50, 5, 100);
1091        agent.services.memory.persistence.autosave_assistant = true;
1092        agent.services.memory.persistence.autosave_min_length = min_length;
1093
1094        // Exact boundary: len == min_length → embed path.
1095        let content_at_boundary = "A".repeat(min_length);
1096        assert_eq!(content_at_boundary.len(), min_length);
1097        agent
1098            .persist_message(Role::Assistant, &content_at_boundary, &[], false)
1099            .await;
1100
1101        // sqlite_message_count must be incremented regardless of embedding success.
1102        assert_eq!(rx.borrow().sqlite_message_count, 1);
1103    }
1104
1105    #[tokio::test]
1106    async fn persist_message_assistant_one_below_min_length_uses_save_only() {
1107        // content.len() == autosave_min_length - 1 → should_embed = false (below boundary).
1108        let provider = mock_provider(vec![]);
1109        let channel = MockChannel::new(vec![]);
1110        let registry = create_test_registry();
1111        let executor = MockToolExecutor::no_tools();
1112
1113        let (tx, rx) = tokio::sync::watch::channel(MetricsSnapshot::default());
1114        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1115        let cid = memory.sqlite().create_conversation().await.unwrap();
1116
1117        let min_length = 10usize;
1118        let mut agent = Agent::new(provider, channel, registry, None, 5, executor)
1119            .with_metrics(tx)
1120            .with_memory(std::sync::Arc::new(memory), cid, 50, 5, 100);
1121        agent.services.memory.persistence.autosave_assistant = true;
1122        agent.services.memory.persistence.autosave_min_length = min_length;
1123
1124        // One below boundary: len == min_length - 1 → save_only path, no embedding.
1125        let content_below_boundary = "A".repeat(min_length - 1);
1126        assert_eq!(content_below_boundary.len(), min_length - 1);
1127        agent
1128            .persist_message(Role::Assistant, &content_below_boundary, &[], false)
1129            .await;
1130
1131        let history = agent
1132            .services
1133            .memory
1134            .persistence
1135            .memory
1136            .as_ref()
1137            .unwrap()
1138            .sqlite()
1139            .load_history(cid, 50)
1140            .await
1141            .unwrap();
1142        assert_eq!(history.len(), 1, "message must still be saved");
1143        // save_only path does not embed.
1144        assert_eq!(rx.borrow().embeddings_generated, 0);
1145    }
1146
1147    #[tokio::test]
1148    async fn persist_message_increments_unsummarized_count() {
1149        let provider = mock_provider(vec![]);
1150        let channel = MockChannel::new(vec![]);
1151        let registry = create_test_registry();
1152        let executor = MockToolExecutor::no_tools();
1153
1154        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1155        let cid = memory.sqlite().create_conversation().await.unwrap();
1156
1157        // threshold=100 ensures no summarization is triggered
1158        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
1159            std::sync::Arc::new(memory),
1160            cid,
1161            50,
1162            5,
1163            100,
1164        );
1165
1166        assert_eq!(agent.services.memory.persistence.unsummarized_count, 0);
1167
1168        agent.persist_message(Role::User, "first", &[], false).await;
1169        assert_eq!(agent.services.memory.persistence.unsummarized_count, 1);
1170
1171        agent
1172            .persist_message(Role::User, "second", &[], false)
1173            .await;
1174        assert_eq!(agent.services.memory.persistence.unsummarized_count, 2);
1175    }
1176
1177    #[tokio::test]
1178    async fn check_summarization_resets_counter_on_success() {
1179        let provider = mock_provider(vec![]);
1180        let channel = MockChannel::new(vec![]);
1181        let registry = create_test_registry();
1182        let executor = MockToolExecutor::no_tools();
1183
1184        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1185        let cid = memory.sqlite().create_conversation().await.unwrap();
1186
1187        // threshold=1 so the second persist triggers summarization check (count > threshold)
1188        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
1189            std::sync::Arc::new(memory),
1190            cid,
1191            50,
1192            5,
1193            1,
1194        );
1195
1196        agent.persist_message(Role::User, "msg1", &[], false).await;
1197        agent.persist_message(Role::User, "msg2", &[], false).await;
1198
1199        // After summarization attempt (summarize returns Ok(None) since no messages qualify),
1200        // the counter is NOT reset to 0 — only reset on Ok(Some(_)).
1201        // This verifies check_summarization is called and the guard condition works.
1202        // unsummarized_count must be >= 2 before any summarization or 0 if summarization ran.
1203        assert!(agent.services.memory.persistence.unsummarized_count <= 2);
1204    }
1205
1206    #[tokio::test]
1207    async fn unsummarized_count_not_incremented_without_memory() {
1208        let provider = mock_provider(vec![]);
1209        let channel = MockChannel::new(vec![]);
1210        let registry = create_test_registry();
1211        let executor = MockToolExecutor::no_tools();
1212        let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
1213
1214        agent.persist_message(Role::User, "hello", &[], false).await;
1215        // No memory configured — persist_message returns early, counter must stay 0.
1216        assert_eq!(agent.services.memory.persistence.unsummarized_count, 0);
1217    }
1218
1219    // R-CRIT-01: unit tests for enqueue_graph_extraction_task guard conditions.
1220    mod graph_extraction_guards {
1221        use super::*;
1222        use crate::config::GraphConfig;
1223        use zeph_llm::provider::MessageMetadata;
1224        use zeph_memory::graph::GraphStore;
1225
1226        fn enabled_graph_config() -> GraphConfig {
1227            GraphConfig {
1228                enabled: true,
1229                ..GraphConfig::default()
1230            }
1231        }
1232
1233        async fn agent_with_graph(
1234            provider: &AnyProvider,
1235            config: GraphConfig,
1236        ) -> Agent<MockChannel> {
1237            let memory =
1238                test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1239            let cid = memory.sqlite().create_conversation().await.unwrap();
1240            Agent::new(
1241                provider.clone(),
1242                MockChannel::new(vec![]),
1243                create_test_registry(),
1244                None,
1245                5,
1246                MockToolExecutor::no_tools(),
1247            )
1248            .with_memory(std::sync::Arc::new(memory), cid, 50, 5, 100)
1249            .with_graph_config(config)
1250        }
1251
1252        #[tokio::test]
1253        async fn injection_flag_guard_skips_extraction() {
1254            // has_injection_flags=true → extraction must be skipped; no counter in graph_metadata.
1255            let provider = mock_provider(vec![]);
1256            let mut agent = agent_with_graph(&provider, enabled_graph_config()).await;
1257            let pool = agent
1258                .services
1259                .memory
1260                .persistence
1261                .memory
1262                .as_ref()
1263                .unwrap()
1264                .sqlite()
1265                .pool()
1266                .clone();
1267
1268            agent
1269                .enqueue_graph_extraction_task("I use Rust", true, false)
1270                .await;
1271
1272            // Give any accidental spawn time to settle.
1273            tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1274
1275            let store = GraphStore::new(pool);
1276            let count = store.get_metadata("extraction_count").await.unwrap();
1277            assert!(
1278                count.is_none(),
1279                "injection flag must prevent extraction_count from being written"
1280            );
1281        }
1282
1283        #[tokio::test]
1284        async fn disabled_config_guard_skips_extraction() {
1285            // graph.enabled=false → extraction must be skipped.
1286            let provider = mock_provider(vec![]);
1287            let disabled_cfg = GraphConfig {
1288                enabled: false,
1289                ..GraphConfig::default()
1290            };
1291            let mut agent = agent_with_graph(&provider, disabled_cfg).await;
1292            let pool = agent
1293                .services
1294                .memory
1295                .persistence
1296                .memory
1297                .as_ref()
1298                .unwrap()
1299                .sqlite()
1300                .pool()
1301                .clone();
1302
1303            agent
1304                .enqueue_graph_extraction_task("I use Rust", false, false)
1305                .await;
1306
1307            tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1308
1309            let store = GraphStore::new(pool);
1310            let count = store.get_metadata("extraction_count").await.unwrap();
1311            assert!(
1312                count.is_none(),
1313                "disabled graph config must prevent extraction"
1314            );
1315        }
1316
1317        #[tokio::test]
1318        async fn happy_path_fires_extraction() {
1319            // With enabled config and no injection flags, extraction is spawned.
1320            // MockProvider returns None (no entities), but the counter must be incremented.
1321            let provider = mock_provider(vec![]);
1322            let mut agent = agent_with_graph(&provider, enabled_graph_config()).await;
1323            let pool = agent
1324                .services
1325                .memory
1326                .persistence
1327                .memory
1328                .as_ref()
1329                .unwrap()
1330                .sqlite()
1331                .pool()
1332                .clone();
1333
1334            agent
1335                .enqueue_graph_extraction_task("I use Rust for systems programming", false, false)
1336                .await;
1337
1338            // Wait for the spawned task to complete.
1339            tokio::time::sleep(std::time::Duration::from_millis(200)).await;
1340
1341            let store = GraphStore::new(pool);
1342            let count = store.get_metadata("extraction_count").await.unwrap();
1343            assert!(
1344                count.is_some(),
1345                "happy-path extraction must increment extraction_count"
1346            );
1347        }
1348
1349        #[tokio::test]
1350        async fn tool_result_parts_guard_skips_extraction() {
1351            // FIX-1 regression: has_tool_result_parts=true → extraction must be skipped.
1352            // Tool result messages contain raw structured output (TOML, JSON, code) — not
1353            // conversational content. Extracting entities from them produces graph noise.
1354            let provider = mock_provider(vec![]);
1355            let mut agent = agent_with_graph(&provider, enabled_graph_config()).await;
1356            let pool = agent
1357                .services
1358                .memory
1359                .persistence
1360                .memory
1361                .as_ref()
1362                .unwrap()
1363                .sqlite()
1364                .pool()
1365                .clone();
1366
1367            agent
1368                .enqueue_graph_extraction_task(
1369                    "[tool_result: abc123]\nprovider_type = \"claude\"\nallowed_commands = []",
1370                    false,
1371                    true, // has_tool_result_parts
1372                )
1373                .await;
1374
1375            tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1376
1377            let store = GraphStore::new(pool);
1378            let count = store.get_metadata("extraction_count").await.unwrap();
1379            assert!(
1380                count.is_none(),
1381                "tool result message must not trigger graph extraction"
1382            );
1383        }
1384
1385        #[tokio::test]
1386        async fn context_filter_excludes_tool_result_messages() {
1387            // FIX-2: context_messages must not include tool result user messages.
1388            // When enqueue_graph_extraction_task collects context, it filters out
1389            // Role::User messages that contain ToolResult parts — only conversational
1390            // user messages are included as extraction context.
1391            //
1392            // This test verifies the guard fires: a tool result message alone is passed
1393            // (has_tool_result_parts=true) → extraction is skipped entirely, so context
1394            // filtering is not exercised. We verify FIX-2 by ensuring a prior tool result
1395            // message in agent.msg.messages is excluded when a subsequent conversational message
1396            // triggers extraction.
1397            let provider = mock_provider(vec![]);
1398            let mut agent = agent_with_graph(&provider, enabled_graph_config()).await;
1399
1400            // Add a tool result message to the agent's message history — this simulates
1401            // a tool call response that arrived before the current conversational turn.
1402            agent.msg.messages.push(Message {
1403                role: Role::User,
1404                content: "[tool_result: abc]\nprovider_type = \"openai\"".to_owned(),
1405                parts: vec![MessagePart::ToolResult {
1406                    tool_use_id: "abc".to_owned(),
1407                    content: "provider_type = \"openai\"".to_owned(),
1408                    is_error: false,
1409                }],
1410                metadata: MessageMetadata::default(),
1411            });
1412
1413            let pool = agent
1414                .services
1415                .memory
1416                .persistence
1417                .memory
1418                .as_ref()
1419                .unwrap()
1420                .sqlite()
1421                .pool()
1422                .clone();
1423
1424            // Trigger extraction for a conversational message (not a tool result).
1425            agent
1426                .enqueue_graph_extraction_task(
1427                    "I prefer Rust for systems programming",
1428                    false,
1429                    false,
1430                )
1431                .await;
1432
1433            tokio::time::sleep(std::time::Duration::from_millis(200)).await;
1434
1435            // Extraction must have fired (conversational message, no injection flags).
1436            let store = GraphStore::new(pool);
1437            let count = store.get_metadata("extraction_count").await.unwrap();
1438            assert!(
1439                count.is_some(),
1440                "conversational message must trigger extraction even with prior tool result in history"
1441            );
1442        }
1443    }
1444
1445    // R-PERS-01: unit tests for enqueue_persona_extraction_task guard conditions.
1446    mod persona_extraction_guards {
1447        use super::*;
1448        use zeph_config::PersonaConfig;
1449        use zeph_llm::provider::MessageMetadata;
1450
1451        fn enabled_persona_config() -> PersonaConfig {
1452            PersonaConfig {
1453                enabled: true,
1454                min_messages: 1,
1455                ..PersonaConfig::default()
1456            }
1457        }
1458
1459        async fn agent_with_persona(
1460            provider: &AnyProvider,
1461            config: PersonaConfig,
1462        ) -> Agent<MockChannel> {
1463            let memory =
1464                test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1465            let cid = memory.sqlite().create_conversation().await.unwrap();
1466            let mut agent = Agent::new(
1467                provider.clone(),
1468                MockChannel::new(vec![]),
1469                create_test_registry(),
1470                None,
1471                5,
1472                MockToolExecutor::no_tools(),
1473            )
1474            .with_memory(std::sync::Arc::new(memory), cid, 50, 5, 100);
1475            agent.services.memory.extraction.persona_config = config;
1476            agent
1477        }
1478
1479        #[tokio::test]
1480        async fn disabled_config_skips_spawn() {
1481            // persona.enabled=false → nothing is spawned; persona_memory stays empty.
1482            let provider = mock_provider(vec![]);
1483            let mut agent = agent_with_persona(
1484                &provider,
1485                PersonaConfig {
1486                    enabled: false,
1487                    ..PersonaConfig::default()
1488                },
1489            )
1490            .await;
1491
1492            // Inject a user message so message count is above threshold.
1493            agent.msg.messages.push(zeph_llm::provider::Message {
1494                role: Role::User,
1495                content: "I prefer Rust for systems programming".to_owned(),
1496                parts: vec![],
1497                metadata: MessageMetadata::default(),
1498            });
1499
1500            agent.enqueue_persona_extraction_task();
1501
1502            let store = agent
1503                .services
1504                .memory
1505                .persistence
1506                .memory
1507                .as_ref()
1508                .unwrap()
1509                .sqlite()
1510                .clone();
1511            let count = store.count_persona_facts().await.unwrap();
1512            assert_eq!(count, 0, "disabled persona config must not write any facts");
1513        }
1514
1515        #[tokio::test]
1516        async fn below_min_messages_skips_spawn() {
1517            // min_messages=3 but only 2 user messages → should skip.
1518            let provider = mock_provider(vec![]);
1519            let mut agent = agent_with_persona(
1520                &provider,
1521                PersonaConfig {
1522                    enabled: true,
1523                    min_messages: 3,
1524                    ..PersonaConfig::default()
1525                },
1526            )
1527            .await;
1528
1529            for text in ["I use Rust", "I prefer async code"] {
1530                agent.msg.messages.push(zeph_llm::provider::Message {
1531                    role: Role::User,
1532                    content: text.to_owned(),
1533                    parts: vec![],
1534                    metadata: MessageMetadata::default(),
1535                });
1536            }
1537
1538            agent.enqueue_persona_extraction_task();
1539
1540            let store = agent
1541                .services
1542                .memory
1543                .persistence
1544                .memory
1545                .as_ref()
1546                .unwrap()
1547                .sqlite()
1548                .clone();
1549            let count = store.count_persona_facts().await.unwrap();
1550            assert_eq!(
1551                count, 0,
1552                "below min_messages threshold must not trigger extraction"
1553            );
1554        }
1555
1556        #[tokio::test]
1557        async fn no_memory_skips_spawn() {
1558            // memory=None → must exit early without panic.
1559            let provider = mock_provider(vec![]);
1560            let channel = MockChannel::new(vec![]);
1561            let registry = create_test_registry();
1562            let executor = MockToolExecutor::no_tools();
1563            let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
1564            agent.services.memory.extraction.persona_config = enabled_persona_config();
1565            agent.msg.messages.push(zeph_llm::provider::Message {
1566                role: Role::User,
1567                content: "I like Rust".to_owned(),
1568                parts: vec![],
1569                metadata: MessageMetadata::default(),
1570            });
1571
1572            // Must not panic even without memory.
1573            agent.enqueue_persona_extraction_task();
1574        }
1575
1576        #[tokio::test]
1577        async fn enabled_enough_messages_spawns_extraction() {
1578            // enabled=true, min_messages=1, self-referential message → extraction runs eagerly
1579            // (not fire-and-forget) and chat() is called on the provider, verified via MockProvider.
1580            use zeph_llm::mock::MockProvider;
1581            let (mock, recorded) = MockProvider::default().with_recording();
1582            let provider = AnyProvider::Mock(mock);
1583            let mut agent = agent_with_persona(&provider, enabled_persona_config()).await;
1584
1585            agent.msg.messages.push(zeph_llm::provider::Message {
1586                role: Role::User,
1587                content: "I prefer Rust for systems programming".to_owned(),
1588                parts: vec![],
1589                metadata: MessageMetadata::default(),
1590            });
1591
1592            agent.enqueue_persona_extraction_task();
1593
1594            // Persona extraction runs via BackgroundSupervisor. Wait for tasks to complete.
1595            agent.runtime.lifecycle.supervisor.join_all_for_test().await;
1596
1597            let calls = recorded.lock().unwrap();
1598            assert!(
1599                !calls.is_empty(),
1600                "happy-path: provider.chat() must be called when extraction completes"
1601            );
1602        }
1603
1604        #[tokio::test]
1605        async fn messages_capped_at_eight() {
1606            // More than 8 user messages → only 8 are passed to extraction.
1607            // Each message contains "I" so self-referential gate passes.
1608            use zeph_llm::mock::MockProvider;
1609            let (mock, recorded) = MockProvider::default().with_recording();
1610            let provider = AnyProvider::Mock(mock);
1611            let mut agent = agent_with_persona(&provider, enabled_persona_config()).await;
1612
1613            for i in 0..12u32 {
1614                agent.msg.messages.push(zeph_llm::provider::Message {
1615                    role: Role::User,
1616                    content: format!("I like message {i}"),
1617                    parts: vec![],
1618                    metadata: MessageMetadata::default(),
1619                });
1620            }
1621
1622            agent.enqueue_persona_extraction_task();
1623
1624            // Allow the background task to complete before asserting.
1625            agent.runtime.lifecycle.supervisor.join_all_for_test().await;
1626
1627            // Verify extraction ran (provider was called).
1628            let calls = recorded.lock().unwrap();
1629            assert!(
1630                !calls.is_empty(),
1631                "extraction must run when enough messages present"
1632            );
1633            // Verify the prompt sent to the provider does not contain messages beyond the 8th.
1634            let prompt = &calls[0];
1635            let user_text = prompt
1636                .iter()
1637                .filter(|m| m.role == Role::User)
1638                .map(|m| m.content.as_str())
1639                .collect::<Vec<_>>()
1640                .join(" ");
1641            // Messages 8..11 ("I like message 8".."I like message 11") must not appear.
1642            assert!(
1643                !user_text.contains("I like message 8"),
1644                "message index 8 must be excluded from extraction input"
1645            );
1646        }
1647
1648        #[test]
1649        fn long_message_truncated_at_char_boundary() {
1650            // Directly verify the per-message truncation logic applied in
1651            // enqueue_persona_extraction_task: a content > 2048 bytes must be capped
1652            // to exactly floor_char_boundary(2048).
1653            let long_content = "x".repeat(3000);
1654            let truncated = if long_content.len() > 2048 {
1655                long_content[..long_content.floor_char_boundary(2048)].to_owned()
1656            } else {
1657                long_content.clone()
1658            };
1659            assert_eq!(
1660                truncated.len(),
1661                2048,
1662                "ASCII content must be truncated to exactly 2048 bytes"
1663            );
1664
1665            // Multi-byte boundary: build a string whose char boundary falls before 2048.
1666            let multi = "é".repeat(1500); // each 'é' is 2 bytes → 3000 bytes total
1667            let truncated_multi = if multi.len() > 2048 {
1668                multi[..multi.floor_char_boundary(2048)].to_owned()
1669            } else {
1670                multi.clone()
1671            };
1672            assert!(
1673                truncated_multi.len() <= 2048,
1674                "multi-byte content must not exceed 2048 bytes"
1675            );
1676            assert!(truncated_multi.is_char_boundary(truncated_multi.len()));
1677        }
1678    }
1679
1680    #[tokio::test]
1681    async fn persist_message_user_always_embeds_regardless_of_autosave_flag() {
1682        let provider = mock_provider(vec![]);
1683        let channel = MockChannel::new(vec![]);
1684        let registry = create_test_registry();
1685        let executor = MockToolExecutor::no_tools();
1686
1687        let (tx, rx) = tokio::sync::watch::channel(MetricsSnapshot::default());
1688        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1689        let cid = memory.sqlite().create_conversation().await.unwrap();
1690
1691        // autosave_assistant=false — but User role always takes embedding path
1692        let mut agent = Agent::new(provider, channel, registry, None, 5, executor)
1693            .with_metrics(tx)
1694            .with_memory(std::sync::Arc::new(memory), cid, 50, 5, 100);
1695        agent.services.memory.persistence.autosave_assistant = false;
1696        agent.services.memory.persistence.autosave_min_length = 20;
1697
1698        let long_user_msg = "A".repeat(100);
1699        agent
1700            .persist_message(Role::User, &long_user_msg, &[], false)
1701            .await;
1702
1703        let history = agent
1704            .services
1705            .memory
1706            .persistence
1707            .memory
1708            .as_ref()
1709            .unwrap()
1710            .sqlite()
1711            .load_history(cid, 50)
1712            .await
1713            .unwrap();
1714        assert_eq!(history.len(), 1, "user message must be saved");
1715        // User messages go through remember_with_parts (embedding path).
1716        // sqlite_message_count must increment regardless of Qdrant availability.
1717        assert_eq!(rx.borrow().sqlite_message_count, 1);
1718    }
1719
1720    // Round-trip tests: verify that persist_message saves the correct parts and they
1721    // are restored correctly by load_history.
1722
1723    #[tokio::test]
1724    async fn persist_message_saves_correct_tool_use_parts() {
1725        use zeph_llm::provider::MessagePart;
1726
1727        let provider = mock_provider(vec![]);
1728        let channel = MockChannel::new(vec![]);
1729        let registry = create_test_registry();
1730        let executor = MockToolExecutor::no_tools();
1731
1732        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1733        let cid = memory.sqlite().create_conversation().await.unwrap();
1734
1735        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
1736            std::sync::Arc::new(memory),
1737            cid,
1738            50,
1739            5,
1740            100,
1741        );
1742
1743        let parts = vec![MessagePart::ToolUse {
1744            id: "call_abc123".to_string(),
1745            name: "read_file".to_string(),
1746            input: serde_json::json!({"path": "/tmp/test.txt"}),
1747        }];
1748        let content = "[tool_use: read_file(call_abc123)]";
1749
1750        agent
1751            .persist_message(Role::Assistant, content, &parts, false)
1752            .await;
1753
1754        let history = agent
1755            .services
1756            .memory
1757            .persistence
1758            .memory
1759            .as_ref()
1760            .unwrap()
1761            .sqlite()
1762            .load_history(cid, 50)
1763            .await
1764            .unwrap();
1765
1766        assert_eq!(history.len(), 1);
1767        assert_eq!(history[0].role, Role::Assistant);
1768        assert_eq!(history[0].content, content);
1769        assert_eq!(history[0].parts.len(), 1);
1770        match &history[0].parts[0] {
1771            MessagePart::ToolUse { id, name, .. } => {
1772                assert_eq!(id, "call_abc123");
1773                assert_eq!(name, "read_file");
1774            }
1775            other => panic!("expected ToolUse part, got {other:?}"),
1776        }
1777        // Regression guard: assistant message must NOT have ToolResult parts
1778        assert!(
1779            !history[0]
1780                .parts
1781                .iter()
1782                .any(|p| matches!(p, MessagePart::ToolResult { .. })),
1783            "assistant message must not contain ToolResult parts"
1784        );
1785    }
1786
1787    #[tokio::test]
1788    async fn persist_message_saves_correct_tool_result_parts() {
1789        use zeph_llm::provider::MessagePart;
1790
1791        let provider = mock_provider(vec![]);
1792        let channel = MockChannel::new(vec![]);
1793        let registry = create_test_registry();
1794        let executor = MockToolExecutor::no_tools();
1795
1796        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1797        let cid = memory.sqlite().create_conversation().await.unwrap();
1798
1799        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
1800            std::sync::Arc::new(memory),
1801            cid,
1802            50,
1803            5,
1804            100,
1805        );
1806
1807        let parts = vec![MessagePart::ToolResult {
1808            tool_use_id: "call_abc123".to_string(),
1809            content: "file contents here".to_string(),
1810            is_error: false,
1811        }];
1812        let content = "[tool_result: call_abc123]\nfile contents here";
1813
1814        agent
1815            .persist_message(Role::User, content, &parts, false)
1816            .await;
1817
1818        let history = agent
1819            .services
1820            .memory
1821            .persistence
1822            .memory
1823            .as_ref()
1824            .unwrap()
1825            .sqlite()
1826            .load_history(cid, 50)
1827            .await
1828            .unwrap();
1829
1830        assert_eq!(history.len(), 1);
1831        assert_eq!(history[0].role, Role::User);
1832        assert_eq!(history[0].content, content);
1833        assert_eq!(history[0].parts.len(), 1);
1834        match &history[0].parts[0] {
1835            MessagePart::ToolResult {
1836                tool_use_id,
1837                content: result_content,
1838                is_error,
1839            } => {
1840                assert_eq!(tool_use_id, "call_abc123");
1841                assert_eq!(result_content, "file contents here");
1842                assert!(!is_error);
1843            }
1844            other => panic!("expected ToolResult part, got {other:?}"),
1845        }
1846        // Regression guard: user message with ToolResult must NOT have ToolUse parts
1847        assert!(
1848            !history[0]
1849                .parts
1850                .iter()
1851                .any(|p| matches!(p, MessagePart::ToolUse { .. })),
1852            "user ToolResult message must not contain ToolUse parts"
1853        );
1854    }
1855
1856    #[tokio::test]
1857    async fn persist_message_roundtrip_preserves_role_part_alignment() {
1858        use zeph_llm::provider::MessagePart;
1859
1860        let provider = mock_provider(vec![]);
1861        let channel = MockChannel::new(vec![]);
1862        let registry = create_test_registry();
1863        let executor = MockToolExecutor::no_tools();
1864
1865        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1866        let cid = memory.sqlite().create_conversation().await.unwrap();
1867
1868        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
1869            std::sync::Arc::new(memory),
1870            cid,
1871            50,
1872            5,
1873            100,
1874        );
1875
1876        // Persist assistant message with ToolUse parts
1877        let assistant_parts = vec![MessagePart::ToolUse {
1878            id: "id_1".to_string(),
1879            name: "list_dir".to_string(),
1880            input: serde_json::json!({"path": "/tmp"}),
1881        }];
1882        agent
1883            .persist_message(
1884                Role::Assistant,
1885                "[tool_use: list_dir(id_1)]",
1886                &assistant_parts,
1887                false,
1888            )
1889            .await;
1890
1891        // Persist user message with ToolResult parts
1892        let user_parts = vec![MessagePart::ToolResult {
1893            tool_use_id: "id_1".to_string(),
1894            content: "file1.txt\nfile2.txt".to_string(),
1895            is_error: false,
1896        }];
1897        agent
1898            .persist_message(
1899                Role::User,
1900                "[tool_result: id_1]\nfile1.txt\nfile2.txt",
1901                &user_parts,
1902                false,
1903            )
1904            .await;
1905
1906        let history = agent
1907            .services
1908            .memory
1909            .persistence
1910            .memory
1911            .as_ref()
1912            .unwrap()
1913            .sqlite()
1914            .load_history(cid, 50)
1915            .await
1916            .unwrap();
1917
1918        assert_eq!(history.len(), 2);
1919
1920        // First message: assistant + ToolUse
1921        assert_eq!(history[0].role, Role::Assistant);
1922        assert_eq!(history[0].content, "[tool_use: list_dir(id_1)]");
1923        assert!(
1924            matches!(&history[0].parts[0], MessagePart::ToolUse { id, .. } if id == "id_1"),
1925            "first message must be assistant ToolUse"
1926        );
1927
1928        // Second message: user + ToolResult
1929        assert_eq!(history[1].role, Role::User);
1930        assert_eq!(
1931            history[1].content,
1932            "[tool_result: id_1]\nfile1.txt\nfile2.txt"
1933        );
1934        assert!(
1935            matches!(&history[1].parts[0], MessagePart::ToolResult { tool_use_id, .. } if tool_use_id == "id_1"),
1936            "second message must be user ToolResult"
1937        );
1938
1939        // Cross-role regression guard: no swapped parts
1940        assert!(
1941            !history[0]
1942                .parts
1943                .iter()
1944                .any(|p| matches!(p, MessagePart::ToolResult { .. })),
1945            "assistant message must not have ToolResult parts"
1946        );
1947        assert!(
1948            !history[1]
1949                .parts
1950                .iter()
1951                .any(|p| matches!(p, MessagePart::ToolUse { .. })),
1952            "user message must not have ToolUse parts"
1953        );
1954    }
1955
1956    #[tokio::test]
1957    async fn persist_message_saves_correct_tool_output_parts() {
1958        use zeph_llm::provider::MessagePart;
1959
1960        let provider = mock_provider(vec![]);
1961        let channel = MockChannel::new(vec![]);
1962        let registry = create_test_registry();
1963        let executor = MockToolExecutor::no_tools();
1964
1965        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1966        let cid = memory.sqlite().create_conversation().await.unwrap();
1967
1968        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
1969            std::sync::Arc::new(memory),
1970            cid,
1971            50,
1972            5,
1973            100,
1974        );
1975
1976        let parts = vec![MessagePart::ToolOutput {
1977            tool_name: "shell".into(),
1978            body: "hello from shell".to_string(),
1979            compacted_at: None,
1980        }];
1981        let content = "[tool: shell]\nhello from shell";
1982
1983        agent
1984            .persist_message(Role::User, content, &parts, false)
1985            .await;
1986
1987        let history = agent
1988            .services
1989            .memory
1990            .persistence
1991            .memory
1992            .as_ref()
1993            .unwrap()
1994            .sqlite()
1995            .load_history(cid, 50)
1996            .await
1997            .unwrap();
1998
1999        assert_eq!(history.len(), 1);
2000        assert_eq!(history[0].role, Role::User);
2001        assert_eq!(history[0].content, content);
2002        assert_eq!(history[0].parts.len(), 1);
2003        match &history[0].parts[0] {
2004            MessagePart::ToolOutput {
2005                tool_name,
2006                body,
2007                compacted_at,
2008            } => {
2009                assert_eq!(tool_name, "shell");
2010                assert_eq!(body, "hello from shell");
2011                assert!(compacted_at.is_none());
2012            }
2013            other => panic!("expected ToolOutput part, got {other:?}"),
2014        }
2015    }
2016
2017    // --- sanitize_tool_pairs unit tests ---
2018
2019    #[tokio::test]
2020    async fn load_history_removes_trailing_orphan_tool_use() {
2021        use zeph_llm::provider::MessagePart;
2022
2023        let provider = mock_provider(vec![]);
2024        let channel = MockChannel::new(vec![]);
2025        let registry = create_test_registry();
2026        let executor = MockToolExecutor::no_tools();
2027
2028        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2029        let cid = memory.sqlite().create_conversation().await.unwrap();
2030        let sqlite = memory.sqlite();
2031
2032        // user message (normal)
2033        sqlite
2034            .save_message(cid, "user", "do something with a tool")
2035            .await
2036            .unwrap();
2037
2038        // assistant message with ToolUse parts — no following tool_result (orphan)
2039        let parts = serde_json::to_string(&[MessagePart::ToolUse {
2040            id: "call_orphan".to_string(),
2041            name: "shell".to_string(),
2042            input: serde_json::json!({"command": "ls"}),
2043        }])
2044        .unwrap();
2045        sqlite
2046            .save_message_with_parts(cid, "assistant", "[tool_use: shell(call_orphan)]", &parts)
2047            .await
2048            .unwrap();
2049
2050        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2051            std::sync::Arc::new(memory),
2052            cid,
2053            50,
2054            5,
2055            100,
2056        );
2057
2058        let messages_before = agent.msg.messages.len();
2059        agent.load_history().await.unwrap();
2060
2061        // Only the user message should be loaded; orphaned assistant tool_use removed.
2062        assert_eq!(
2063            agent.msg.messages.len(),
2064            messages_before + 1,
2065            "orphaned trailing tool_use must be removed"
2066        );
2067        assert_eq!(agent.msg.messages.last().unwrap().role, Role::User);
2068    }
2069
2070    #[tokio::test]
2071    async fn load_history_removes_leading_orphan_tool_result() {
2072        use zeph_llm::provider::MessagePart;
2073
2074        let provider = mock_provider(vec![]);
2075        let channel = MockChannel::new(vec![]);
2076        let registry = create_test_registry();
2077        let executor = MockToolExecutor::no_tools();
2078
2079        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2080        let cid = memory.sqlite().create_conversation().await.unwrap();
2081        let sqlite = memory.sqlite();
2082
2083        // Leading orphan: user message with ToolResult but no preceding tool_use
2084        let result_parts = serde_json::to_string(&[MessagePart::ToolResult {
2085            tool_use_id: "call_missing".to_string(),
2086            content: "result data".to_string(),
2087            is_error: false,
2088        }])
2089        .unwrap();
2090        sqlite
2091            .save_message_with_parts(
2092                cid,
2093                "user",
2094                "[tool_result: call_missing]\nresult data",
2095                &result_parts,
2096            )
2097            .await
2098            .unwrap();
2099
2100        // A valid assistant reply after the orphan
2101        sqlite
2102            .save_message(cid, "assistant", "here is my response")
2103            .await
2104            .unwrap();
2105
2106        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2107            std::sync::Arc::new(memory),
2108            cid,
2109            50,
2110            5,
2111            100,
2112        );
2113
2114        let messages_before = agent.msg.messages.len();
2115        agent.load_history().await.unwrap();
2116
2117        // Orphaned leading tool_result removed; only assistant message kept.
2118        assert_eq!(
2119            agent.msg.messages.len(),
2120            messages_before + 1,
2121            "orphaned leading tool_result must be removed"
2122        );
2123        assert_eq!(agent.msg.messages.last().unwrap().role, Role::Assistant);
2124    }
2125
2126    #[tokio::test]
2127    async fn load_history_preserves_complete_tool_pairs() {
2128        use zeph_llm::provider::MessagePart;
2129
2130        let provider = mock_provider(vec![]);
2131        let channel = MockChannel::new(vec![]);
2132        let registry = create_test_registry();
2133        let executor = MockToolExecutor::no_tools();
2134
2135        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2136        let cid = memory.sqlite().create_conversation().await.unwrap();
2137        let sqlite = memory.sqlite();
2138
2139        // Complete tool_use / tool_result pair
2140        let use_parts = serde_json::to_string(&[MessagePart::ToolUse {
2141            id: "call_ok".to_string(),
2142            name: "shell".to_string(),
2143            input: serde_json::json!({"command": "pwd"}),
2144        }])
2145        .unwrap();
2146        sqlite
2147            .save_message_with_parts(cid, "assistant", "[tool_use: shell(call_ok)]", &use_parts)
2148            .await
2149            .unwrap();
2150
2151        let result_parts = serde_json::to_string(&[MessagePart::ToolResult {
2152            tool_use_id: "call_ok".to_string(),
2153            content: "/home/user".to_string(),
2154            is_error: false,
2155        }])
2156        .unwrap();
2157        sqlite
2158            .save_message_with_parts(
2159                cid,
2160                "user",
2161                "[tool_result: call_ok]\n/home/user",
2162                &result_parts,
2163            )
2164            .await
2165            .unwrap();
2166
2167        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2168            std::sync::Arc::new(memory),
2169            cid,
2170            50,
2171            5,
2172            100,
2173        );
2174
2175        let messages_before = agent.msg.messages.len();
2176        agent.load_history().await.unwrap();
2177
2178        // Both messages must be preserved.
2179        assert_eq!(
2180            agent.msg.messages.len(),
2181            messages_before + 2,
2182            "complete tool_use/tool_result pair must be preserved"
2183        );
2184        assert_eq!(agent.msg.messages[messages_before].role, Role::Assistant);
2185        assert_eq!(agent.msg.messages[messages_before + 1].role, Role::User);
2186    }
2187
2188    #[tokio::test]
2189    async fn load_history_handles_multiple_trailing_orphans() {
2190        use zeph_llm::provider::MessagePart;
2191
2192        let provider = mock_provider(vec![]);
2193        let channel = MockChannel::new(vec![]);
2194        let registry = create_test_registry();
2195        let executor = MockToolExecutor::no_tools();
2196
2197        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2198        let cid = memory.sqlite().create_conversation().await.unwrap();
2199        let sqlite = memory.sqlite();
2200
2201        // Normal user message
2202        sqlite.save_message(cid, "user", "start").await.unwrap();
2203
2204        // First orphaned tool_use
2205        let parts1 = serde_json::to_string(&[MessagePart::ToolUse {
2206            id: "call_1".to_string(),
2207            name: "shell".to_string(),
2208            input: serde_json::json!({}),
2209        }])
2210        .unwrap();
2211        sqlite
2212            .save_message_with_parts(cid, "assistant", "[tool_use: shell(call_1)]", &parts1)
2213            .await
2214            .unwrap();
2215
2216        // Second orphaned tool_use (consecutive, no tool_result between them)
2217        let parts2 = serde_json::to_string(&[MessagePart::ToolUse {
2218            id: "call_2".to_string(),
2219            name: "read_file".to_string(),
2220            input: serde_json::json!({}),
2221        }])
2222        .unwrap();
2223        sqlite
2224            .save_message_with_parts(cid, "assistant", "[tool_use: read_file(call_2)]", &parts2)
2225            .await
2226            .unwrap();
2227
2228        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2229            std::sync::Arc::new(memory),
2230            cid,
2231            50,
2232            5,
2233            100,
2234        );
2235
2236        let messages_before = agent.msg.messages.len();
2237        agent.load_history().await.unwrap();
2238
2239        // Both orphaned tool_use messages removed; only the user message kept.
2240        assert_eq!(
2241            agent.msg.messages.len(),
2242            messages_before + 1,
2243            "all trailing orphaned tool_use messages must be removed"
2244        );
2245        assert_eq!(agent.msg.messages.last().unwrap().role, Role::User);
2246    }
2247
2248    #[tokio::test]
2249    async fn load_history_no_tool_messages_unchanged() {
2250        let provider = mock_provider(vec![]);
2251        let channel = MockChannel::new(vec![]);
2252        let registry = create_test_registry();
2253        let executor = MockToolExecutor::no_tools();
2254
2255        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2256        let cid = memory.sqlite().create_conversation().await.unwrap();
2257        let sqlite = memory.sqlite();
2258
2259        sqlite.save_message(cid, "user", "hello").await.unwrap();
2260        sqlite
2261            .save_message(cid, "assistant", "hi there")
2262            .await
2263            .unwrap();
2264        sqlite
2265            .save_message(cid, "user", "how are you?")
2266            .await
2267            .unwrap();
2268
2269        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2270            std::sync::Arc::new(memory),
2271            cid,
2272            50,
2273            5,
2274            100,
2275        );
2276
2277        let messages_before = agent.msg.messages.len();
2278        agent.load_history().await.unwrap();
2279
2280        // All three plain messages must be preserved.
2281        assert_eq!(
2282            agent.msg.messages.len(),
2283            messages_before + 3,
2284            "plain messages without tool parts must pass through unchanged"
2285        );
2286    }
2287
2288    #[tokio::test]
2289    async fn load_history_removes_both_leading_and_trailing_orphans() {
2290        use zeph_llm::provider::MessagePart;
2291
2292        let provider = mock_provider(vec![]);
2293        let channel = MockChannel::new(vec![]);
2294        let registry = create_test_registry();
2295        let executor = MockToolExecutor::no_tools();
2296
2297        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2298        let cid = memory.sqlite().create_conversation().await.unwrap();
2299        let sqlite = memory.sqlite();
2300
2301        // Leading orphan: user message with ToolResult, no preceding tool_use
2302        let result_parts = serde_json::to_string(&[MessagePart::ToolResult {
2303            tool_use_id: "call_leading".to_string(),
2304            content: "orphaned result".to_string(),
2305            is_error: false,
2306        }])
2307        .unwrap();
2308        sqlite
2309            .save_message_with_parts(
2310                cid,
2311                "user",
2312                "[tool_result: call_leading]\norphaned result",
2313                &result_parts,
2314            )
2315            .await
2316            .unwrap();
2317
2318        // Valid middle messages
2319        sqlite
2320            .save_message(cid, "user", "what is 2+2?")
2321            .await
2322            .unwrap();
2323        sqlite.save_message(cid, "assistant", "4").await.unwrap();
2324
2325        // Trailing orphan: assistant message with ToolUse, no following tool_result
2326        let use_parts = serde_json::to_string(&[MessagePart::ToolUse {
2327            id: "call_trailing".to_string(),
2328            name: "shell".to_string(),
2329            input: serde_json::json!({"command": "date"}),
2330        }])
2331        .unwrap();
2332        sqlite
2333            .save_message_with_parts(
2334                cid,
2335                "assistant",
2336                "[tool_use: shell(call_trailing)]",
2337                &use_parts,
2338            )
2339            .await
2340            .unwrap();
2341
2342        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2343            std::sync::Arc::new(memory),
2344            cid,
2345            50,
2346            5,
2347            100,
2348        );
2349
2350        let messages_before = agent.msg.messages.len();
2351        agent.load_history().await.unwrap();
2352
2353        // Both orphans removed; only the 2 valid middle messages kept.
2354        assert_eq!(
2355            agent.msg.messages.len(),
2356            messages_before + 2,
2357            "both leading and trailing orphans must be removed"
2358        );
2359        assert_eq!(agent.msg.messages[messages_before].role, Role::User);
2360        assert_eq!(agent.msg.messages[messages_before].content, "what is 2+2?");
2361        assert_eq!(
2362            agent.msg.messages[messages_before + 1].role,
2363            Role::Assistant
2364        );
2365        assert_eq!(agent.msg.messages[messages_before + 1].content, "4");
2366    }
2367
2368    /// RC1 regression: mid-history assistant[`ToolUse`] without a following user[`ToolResult`]
2369    /// must have its `ToolUse` parts stripped (text preserved). The message count stays the same
2370    /// because the assistant message has a text content fallback; only `ToolUse` parts are
2371    /// removed.
2372    #[tokio::test]
2373    async fn sanitize_tool_pairs_strips_mid_history_orphan_tool_use() {
2374        use zeph_llm::provider::MessagePart;
2375
2376        let provider = mock_provider(vec![]);
2377        let channel = MockChannel::new(vec![]);
2378        let registry = create_test_registry();
2379        let executor = MockToolExecutor::no_tools();
2380
2381        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2382        let cid = memory.sqlite().create_conversation().await.unwrap();
2383        let sqlite = memory.sqlite();
2384
2385        // Normal first exchange.
2386        sqlite
2387            .save_message(cid, "user", "first question")
2388            .await
2389            .unwrap();
2390        sqlite
2391            .save_message(cid, "assistant", "first answer")
2392            .await
2393            .unwrap();
2394
2395        // Mid-history orphan: assistant with ToolUse but NO following ToolResult user message.
2396        // This models the compaction-split scenario (RC2) where replace_conversation hid the
2397        // user[ToolResult] but left the assistant[ToolUse] visible.
2398        let use_parts = serde_json::to_string(&[
2399            MessagePart::ToolUse {
2400                id: "call_mid_1".to_string(),
2401                name: "shell".to_string(),
2402                input: serde_json::json!({"command": "ls"}),
2403            },
2404            MessagePart::Text {
2405                text: "Let me check the files.".to_string(),
2406            },
2407        ])
2408        .unwrap();
2409        sqlite
2410            .save_message_with_parts(cid, "assistant", "Let me check the files.", &use_parts)
2411            .await
2412            .unwrap();
2413
2414        // Another normal exchange after the orphan.
2415        sqlite
2416            .save_message(cid, "user", "second question")
2417            .await
2418            .unwrap();
2419        sqlite
2420            .save_message(cid, "assistant", "second answer")
2421            .await
2422            .unwrap();
2423
2424        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2425            std::sync::Arc::new(memory),
2426            cid,
2427            50,
2428            5,
2429            100,
2430        );
2431
2432        let messages_before = agent.msg.messages.len();
2433        agent.load_history().await.unwrap();
2434
2435        // All 5 messages remain (orphan message kept because it has text), but the orphaned
2436        // message must have its ToolUse parts stripped.
2437        assert_eq!(
2438            agent.msg.messages.len(),
2439            messages_before + 5,
2440            "message count must be 5 (orphan message kept — has text content)"
2441        );
2442
2443        // The orphaned assistant message (index 2 in the loaded slice) must have no ToolUse parts.
2444        let orphan = &agent.msg.messages[messages_before + 2];
2445        assert_eq!(orphan.role, Role::Assistant);
2446        assert!(
2447            !orphan
2448                .parts
2449                .iter()
2450                .any(|p| matches!(p, MessagePart::ToolUse { .. })),
2451            "orphaned ToolUse parts must be stripped from mid-history message"
2452        );
2453        // Text part must be preserved.
2454        assert!(
2455            orphan.parts.iter().any(
2456                |p| matches!(p, MessagePart::Text { text } if text == "Let me check the files.")
2457            ),
2458            "text content of orphaned assistant message must be preserved"
2459        );
2460    }
2461
2462    /// RC3 regression: a user message with empty `content` but non-empty `parts` (`ToolResult`)
2463    /// must NOT be skipped by `load_history`. Previously the empty-content check dropped these
2464    /// messages before `sanitize_tool_pairs` ran, leaving the preceding assistant `ToolUse`
2465    /// orphaned.
2466    #[tokio::test]
2467    async fn load_history_keeps_tool_only_user_message() {
2468        use zeph_llm::provider::MessagePart;
2469
2470        let provider = mock_provider(vec![]);
2471        let channel = MockChannel::new(vec![]);
2472        let registry = create_test_registry();
2473        let executor = MockToolExecutor::no_tools();
2474
2475        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2476        let cid = memory.sqlite().create_conversation().await.unwrap();
2477        let sqlite = memory.sqlite();
2478
2479        // Assistant sends a ToolUse.
2480        let use_parts = serde_json::to_string(&[MessagePart::ToolUse {
2481            id: "call_rc3".to_string(),
2482            name: "memory_save".to_string(),
2483            input: serde_json::json!({"content": "something"}),
2484        }])
2485        .unwrap();
2486        sqlite
2487            .save_message_with_parts(cid, "assistant", "[tool_use: memory_save]", &use_parts)
2488            .await
2489            .unwrap();
2490
2491        // User message has empty text content but carries ToolResult in parts — native tool pattern.
2492        let result_parts = serde_json::to_string(&[MessagePart::ToolResult {
2493            tool_use_id: "call_rc3".to_string(),
2494            content: "saved".to_string(),
2495            is_error: false,
2496        }])
2497        .unwrap();
2498        sqlite
2499            .save_message_with_parts(cid, "user", "", &result_parts)
2500            .await
2501            .unwrap();
2502
2503        sqlite.save_message(cid, "assistant", "done").await.unwrap();
2504
2505        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2506            std::sync::Arc::new(memory),
2507            cid,
2508            50,
2509            5,
2510            100,
2511        );
2512
2513        let messages_before = agent.msg.messages.len();
2514        agent.load_history().await.unwrap();
2515
2516        // All 3 messages must be loaded — the empty-content ToolResult user message must NOT be
2517        // dropped.
2518        assert_eq!(
2519            agent.msg.messages.len(),
2520            messages_before + 3,
2521            "user message with empty content but ToolResult parts must not be dropped"
2522        );
2523
2524        // The user message at index 1 must still carry the ToolResult part.
2525        let user_msg = &agent.msg.messages[messages_before + 1];
2526        assert_eq!(user_msg.role, Role::User);
2527        assert!(
2528            user_msg.parts.iter().any(
2529                |p| matches!(p, MessagePart::ToolResult { tool_use_id, .. } if tool_use_id == "call_rc3")
2530            ),
2531            "ToolResult part must be preserved on user message with empty content"
2532        );
2533    }
2534
2535    /// RC2 reverse pass: a user message with a `ToolResult` whose `tool_use_id` has no matching
2536    /// `ToolUse` in the preceding assistant message must be stripped by
2537    /// `strip_mid_history_orphans`.
2538    #[tokio::test]
2539    async fn strip_orphans_removes_orphaned_tool_result() {
2540        use zeph_llm::provider::MessagePart;
2541
2542        let provider = mock_provider(vec![]);
2543        let channel = MockChannel::new(vec![]);
2544        let registry = create_test_registry();
2545        let executor = MockToolExecutor::no_tools();
2546
2547        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2548        let cid = memory.sqlite().create_conversation().await.unwrap();
2549        let sqlite = memory.sqlite();
2550
2551        // Normal exchange before the orphan.
2552        sqlite.save_message(cid, "user", "hello").await.unwrap();
2553        sqlite.save_message(cid, "assistant", "hi").await.unwrap();
2554
2555        // Assistant message that does NOT contain a ToolUse.
2556        sqlite
2557            .save_message(cid, "assistant", "plain answer")
2558            .await
2559            .unwrap();
2560
2561        // User message references a tool_use_id that was never sent by the preceding assistant.
2562        let orphan_result_parts = serde_json::to_string(&[MessagePart::ToolResult {
2563            tool_use_id: "call_nonexistent".to_string(),
2564            content: "stale result".to_string(),
2565            is_error: false,
2566        }])
2567        .unwrap();
2568        sqlite
2569            .save_message_with_parts(
2570                cid,
2571                "user",
2572                "[tool_result: call_nonexistent]\nstale result",
2573                &orphan_result_parts,
2574            )
2575            .await
2576            .unwrap();
2577
2578        sqlite
2579            .save_message(cid, "assistant", "final")
2580            .await
2581            .unwrap();
2582
2583        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2584            std::sync::Arc::new(memory),
2585            cid,
2586            50,
2587            5,
2588            100,
2589        );
2590
2591        let messages_before = agent.msg.messages.len();
2592        agent.load_history().await.unwrap();
2593
2594        // The orphaned ToolResult part must have been stripped from the user message.
2595        // The user message itself may be removed (parts empty + content non-empty) or kept with
2596        // the text content — but it must NOT retain the orphaned ToolResult part.
2597        let loaded = &agent.msg.messages[messages_before..];
2598        for msg in loaded {
2599            assert!(
2600                !msg.parts.iter().any(|p| matches!(
2601                    p,
2602                    MessagePart::ToolResult { tool_use_id, .. } if tool_use_id == "call_nonexistent"
2603                )),
2604                "orphaned ToolResult part must be stripped from history"
2605            );
2606        }
2607    }
2608
2609    /// RC2 reverse pass: a complete `tool_use` + `tool_result` pair must pass through the reverse
2610    /// orphan check intact; the fix must not strip valid `ToolResult` parts.
2611    #[tokio::test]
2612    async fn strip_orphans_keeps_complete_pair() {
2613        use zeph_llm::provider::MessagePart;
2614
2615        let provider = mock_provider(vec![]);
2616        let channel = MockChannel::new(vec![]);
2617        let registry = create_test_registry();
2618        let executor = MockToolExecutor::no_tools();
2619
2620        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2621        let cid = memory.sqlite().create_conversation().await.unwrap();
2622        let sqlite = memory.sqlite();
2623
2624        let use_parts = serde_json::to_string(&[MessagePart::ToolUse {
2625            id: "call_valid".to_string(),
2626            name: "shell".to_string(),
2627            input: serde_json::json!({"command": "ls"}),
2628        }])
2629        .unwrap();
2630        sqlite
2631            .save_message_with_parts(cid, "assistant", "[tool_use: shell]", &use_parts)
2632            .await
2633            .unwrap();
2634
2635        let result_parts = serde_json::to_string(&[MessagePart::ToolResult {
2636            tool_use_id: "call_valid".to_string(),
2637            content: "file.rs".to_string(),
2638            is_error: false,
2639        }])
2640        .unwrap();
2641        sqlite
2642            .save_message_with_parts(cid, "user", "", &result_parts)
2643            .await
2644            .unwrap();
2645
2646        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2647            std::sync::Arc::new(memory),
2648            cid,
2649            50,
2650            5,
2651            100,
2652        );
2653
2654        let messages_before = agent.msg.messages.len();
2655        agent.load_history().await.unwrap();
2656
2657        assert_eq!(
2658            agent.msg.messages.len(),
2659            messages_before + 2,
2660            "complete tool_use/tool_result pair must be preserved"
2661        );
2662
2663        let user_msg = &agent.msg.messages[messages_before + 1];
2664        assert!(
2665            user_msg.parts.iter().any(|p| matches!(
2666                p,
2667                MessagePart::ToolResult { tool_use_id, .. } if tool_use_id == "call_valid"
2668            )),
2669            "ToolResult part for a matched tool_use must not be stripped"
2670        );
2671    }
2672
2673    /// RC2 reverse pass: history with a mix of complete pairs and orphaned `ToolResult` messages.
2674    /// Orphaned `ToolResult` parts must be stripped; complete pairs must pass through intact.
2675    #[tokio::test]
2676    async fn strip_orphans_mixed_history() {
2677        use zeph_llm::provider::MessagePart;
2678
2679        let provider = mock_provider(vec![]);
2680        let channel = MockChannel::new(vec![]);
2681        let registry = create_test_registry();
2682        let executor = MockToolExecutor::no_tools();
2683
2684        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2685        let cid = memory.sqlite().create_conversation().await.unwrap();
2686        let sqlite = memory.sqlite();
2687
2688        // First: complete tool_use / tool_result pair.
2689        let use_parts_ok = serde_json::to_string(&[MessagePart::ToolUse {
2690            id: "call_good".to_string(),
2691            name: "shell".to_string(),
2692            input: serde_json::json!({"command": "pwd"}),
2693        }])
2694        .unwrap();
2695        sqlite
2696            .save_message_with_parts(cid, "assistant", "[tool_use: shell]", &use_parts_ok)
2697            .await
2698            .unwrap();
2699
2700        let result_parts_ok = serde_json::to_string(&[MessagePart::ToolResult {
2701            tool_use_id: "call_good".to_string(),
2702            content: "/home".to_string(),
2703            is_error: false,
2704        }])
2705        .unwrap();
2706        sqlite
2707            .save_message_with_parts(cid, "user", "", &result_parts_ok)
2708            .await
2709            .unwrap();
2710
2711        // Second: plain assistant message followed by an orphaned ToolResult user message.
2712        sqlite
2713            .save_message(cid, "assistant", "text only")
2714            .await
2715            .unwrap();
2716
2717        let orphan_parts = serde_json::to_string(&[MessagePart::ToolResult {
2718            tool_use_id: "call_ghost".to_string(),
2719            content: "ghost result".to_string(),
2720            is_error: false,
2721        }])
2722        .unwrap();
2723        sqlite
2724            .save_message_with_parts(
2725                cid,
2726                "user",
2727                "[tool_result: call_ghost]\nghost result",
2728                &orphan_parts,
2729            )
2730            .await
2731            .unwrap();
2732
2733        sqlite
2734            .save_message(cid, "assistant", "final reply")
2735            .await
2736            .unwrap();
2737
2738        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2739            std::sync::Arc::new(memory),
2740            cid,
2741            50,
2742            5,
2743            100,
2744        );
2745
2746        let messages_before = agent.msg.messages.len();
2747        agent.load_history().await.unwrap();
2748
2749        let loaded = &agent.msg.messages[messages_before..];
2750
2751        // The orphaned ToolResult part must not appear in any message.
2752        for msg in loaded {
2753            assert!(
2754                !msg.parts.iter().any(|p| matches!(
2755                    p,
2756                    MessagePart::ToolResult { tool_use_id, .. } if tool_use_id == "call_ghost"
2757                )),
2758                "orphaned ToolResult (call_ghost) must be stripped from history"
2759            );
2760        }
2761
2762        // The matched ToolResult must still be present on the user message following the
2763        // first assistant message.
2764        let has_good_result = loaded.iter().any(|msg| {
2765            msg.role == Role::User
2766                && msg.parts.iter().any(|p| {
2767                    matches!(
2768                        p,
2769                        MessagePart::ToolResult { tool_use_id, .. } if tool_use_id == "call_good"
2770                    )
2771                })
2772        });
2773        assert!(
2774            has_good_result,
2775            "matched ToolResult (call_good) must be preserved in history"
2776        );
2777    }
2778
2779    /// Regression: a properly matched `tool_use`/`tool_result` pair must NOT be touched by the
2780    /// mid-history scan — ensures the fix doesn't break valid tool exchanges.
2781    #[tokio::test]
2782    async fn sanitize_tool_pairs_preserves_matched_tool_pair() {
2783        use zeph_llm::provider::MessagePart;
2784
2785        let provider = mock_provider(vec![]);
2786        let channel = MockChannel::new(vec![]);
2787        let registry = create_test_registry();
2788        let executor = MockToolExecutor::no_tools();
2789
2790        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2791        let cid = memory.sqlite().create_conversation().await.unwrap();
2792        let sqlite = memory.sqlite();
2793
2794        sqlite
2795            .save_message(cid, "user", "run a command")
2796            .await
2797            .unwrap();
2798
2799        // Assistant sends a ToolUse.
2800        let use_parts = serde_json::to_string(&[MessagePart::ToolUse {
2801            id: "call_ok".to_string(),
2802            name: "shell".to_string(),
2803            input: serde_json::json!({"command": "echo hi"}),
2804        }])
2805        .unwrap();
2806        sqlite
2807            .save_message_with_parts(cid, "assistant", "[tool_use: shell]", &use_parts)
2808            .await
2809            .unwrap();
2810
2811        // Matching user ToolResult follows.
2812        let result_parts = serde_json::to_string(&[MessagePart::ToolResult {
2813            tool_use_id: "call_ok".to_string(),
2814            content: "hi".to_string(),
2815            is_error: false,
2816        }])
2817        .unwrap();
2818        sqlite
2819            .save_message_with_parts(cid, "user", "[tool_result: call_ok]\nhi", &result_parts)
2820            .await
2821            .unwrap();
2822
2823        sqlite.save_message(cid, "assistant", "done").await.unwrap();
2824
2825        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2826            std::sync::Arc::new(memory),
2827            cid,
2828            50,
2829            5,
2830            100,
2831        );
2832
2833        let messages_before = agent.msg.messages.len();
2834        agent.load_history().await.unwrap();
2835
2836        // All 4 messages preserved, tool_use parts intact.
2837        assert_eq!(
2838            agent.msg.messages.len(),
2839            messages_before + 4,
2840            "matched tool pair must not be removed"
2841        );
2842        let tool_msg = &agent.msg.messages[messages_before + 1];
2843        assert!(
2844            tool_msg
2845                .parts
2846                .iter()
2847                .any(|p| matches!(p, MessagePart::ToolUse { id, .. } if id == "call_ok")),
2848            "matched ToolUse parts must be preserved"
2849        );
2850    }
2851
2852    /// RC5: `persist_cancelled_tool_results` must persist a tombstone user message containing
2853    /// `is_error=true` `ToolResult` parts for all `tool_calls` IDs so the preceding assistant
2854    /// `ToolUse` is never orphaned in the DB after a cancellation.
2855    #[tokio::test]
2856    async fn persist_cancelled_tool_results_pairs_tool_use() {
2857        use zeph_llm::provider::MessagePart;
2858
2859        let provider = mock_provider(vec![]);
2860        let channel = MockChannel::new(vec![]);
2861        let registry = create_test_registry();
2862        let executor = MockToolExecutor::no_tools();
2863
2864        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2865        let cid = memory.sqlite().create_conversation().await.unwrap();
2866
2867        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2868            std::sync::Arc::new(memory),
2869            cid,
2870            50,
2871            5,
2872            100,
2873        );
2874
2875        // Simulate: assistant message with two ToolUse parts already persisted.
2876        let tool_calls = vec![
2877            zeph_llm::provider::ToolUseRequest {
2878                id: "cancel_id_1".to_string(),
2879                name: "shell".to_string().into(),
2880                input: serde_json::json!({}),
2881            },
2882            zeph_llm::provider::ToolUseRequest {
2883                id: "cancel_id_2".to_string(),
2884                name: "read_file".to_string().into(),
2885                input: serde_json::json!({}),
2886            },
2887        ];
2888
2889        agent.persist_cancelled_tool_results(&tool_calls).await;
2890
2891        let history = agent
2892            .services
2893            .memory
2894            .persistence
2895            .memory
2896            .as_ref()
2897            .unwrap()
2898            .sqlite()
2899            .load_history(cid, 50)
2900            .await
2901            .unwrap();
2902
2903        // Exactly one user message must have been persisted.
2904        assert_eq!(history.len(), 1);
2905        assert_eq!(history[0].role, Role::User);
2906
2907        // It must contain is_error=true ToolResult for each tool call ID.
2908        for tc in &tool_calls {
2909            assert!(
2910                history[0].parts.iter().any(|p| matches!(
2911                    p,
2912                    MessagePart::ToolResult { tool_use_id, is_error, .. }
2913                        if tool_use_id == &tc.id && *is_error
2914                )),
2915                "tombstone ToolResult for {} must be present and is_error=true",
2916                tc.id
2917            );
2918        }
2919    }
2920
2921    // ---- Integration tests for the #2529 fix: soft-delete of legacy-content orphans ----
2922
2923    /// #2529 regression: orphaned assistant `ToolUse` + user `ToolResult` pair where BOTH messages
2924    /// have content consisting solely of legacy tool bracket strings (no human-readable text).
2925    ///
2926    /// Before the fix, `content.trim().is_empty()` returned false for these messages, so they
2927    /// were never soft-deleted and the WARN log repeated on every session restart.
2928    ///
2929    /// After the fix, `has_meaningful_content` returns false for legacy-only content, so both
2930    /// orphaned messages are soft-deleted (non-null `deleted_at`) in `SQLite`.
2931    #[tokio::test]
2932    async fn issue_2529_orphaned_legacy_content_pair_is_soft_deleted() {
2933        use zeph_llm::provider::MessagePart;
2934
2935        let provider = mock_provider(vec![]);
2936        let channel = MockChannel::new(vec![]);
2937        let registry = create_test_registry();
2938        let executor = MockToolExecutor::no_tools();
2939
2940        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2941        let cid = memory.sqlite().create_conversation().await.unwrap();
2942        let sqlite = memory.sqlite();
2943
2944        // A normal user message that anchors the conversation.
2945        sqlite
2946            .save_message(cid, "user", "save this for me")
2947            .await
2948            .unwrap();
2949
2950        // Orphaned assistant[ToolUse]: content is ONLY a legacy tool tag — no matching
2951        // ToolResult follows. This is the exact pattern that triggered #2529.
2952        let use_parts = serde_json::to_string(&[MessagePart::ToolUse {
2953            id: "call_2529".to_string(),
2954            name: "memory_save".to_string(),
2955            input: serde_json::json!({"content": "save this"}),
2956        }])
2957        .unwrap();
2958        let orphan_assistant_id = sqlite
2959            .save_message_with_parts(
2960                cid,
2961                "assistant",
2962                "[tool_use: memory_save(call_2529)]",
2963                &use_parts,
2964            )
2965            .await
2966            .unwrap();
2967
2968        // Orphaned user[ToolResult]: content is ONLY a legacy tool tag + body.
2969        // Its tool_use_id ("call_2529") does not match any ToolUse in the preceding assistant
2970        // message in this position (will be made orphaned by inserting a plain assistant message
2971        // between them to break the pair).
2972        sqlite
2973            .save_message(cid, "assistant", "here is a plain reply")
2974            .await
2975            .unwrap();
2976
2977        let result_parts = serde_json::to_string(&[MessagePart::ToolResult {
2978            tool_use_id: "call_2529".to_string(),
2979            content: "saved".to_string(),
2980            is_error: false,
2981        }])
2982        .unwrap();
2983        let orphan_user_id = sqlite
2984            .save_message_with_parts(
2985                cid,
2986                "user",
2987                "[tool_result: call_2529]\nsaved",
2988                &result_parts,
2989            )
2990            .await
2991            .unwrap();
2992
2993        // Final plain message so history doesn't end on the orphan.
2994        sqlite.save_message(cid, "assistant", "done").await.unwrap();
2995
2996        let memory_arc = std::sync::Arc::new(memory);
2997        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2998            memory_arc.clone(),
2999            cid,
3000            50,
3001            5,
3002            100,
3003        );
3004
3005        agent.load_history().await.unwrap();
3006
3007        // Verify that both orphaned messages now have `deleted_at IS NOT NULL` in SQLite.
3008        // COUNT(*) WHERE deleted_at IS NOT NULL returns 1 if soft-deleted, 0 otherwise.
3009        let assistant_deleted_count: Vec<i64> = zeph_db::query_scalar(
3010            "SELECT COUNT(*) FROM messages WHERE id = ? AND deleted_at IS NOT NULL",
3011        )
3012        .bind(orphan_assistant_id)
3013        .fetch_all(memory_arc.sqlite().pool())
3014        .await
3015        .unwrap();
3016
3017        let user_deleted_count: Vec<i64> = zeph_db::query_scalar(
3018            "SELECT COUNT(*) FROM messages WHERE id = ? AND deleted_at IS NOT NULL",
3019        )
3020        .bind(orphan_user_id)
3021        .fetch_all(memory_arc.sqlite().pool())
3022        .await
3023        .unwrap();
3024
3025        assert_eq!(
3026            assistant_deleted_count.first().copied().unwrap_or(0),
3027            1,
3028            "orphaned assistant[ToolUse] with legacy-only content must be soft-deleted (deleted_at IS NOT NULL)"
3029        );
3030        assert_eq!(
3031            user_deleted_count.first().copied().unwrap_or(0),
3032            1,
3033            "orphaned user[ToolResult] with legacy-only content must be soft-deleted (deleted_at IS NOT NULL)"
3034        );
3035    }
3036
3037    /// #2529 idempotency: after soft-delete on first `load_history`, a second call must not
3038    /// re-load the soft-deleted orphans. This ensures the WARN log does not repeat on the
3039    /// next session startup for the same orphaned messages.
3040    #[tokio::test]
3041    async fn issue_2529_soft_delete_is_idempotent_across_sessions() {
3042        use zeph_llm::provider::MessagePart;
3043
3044        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
3045        let cid = memory.sqlite().create_conversation().await.unwrap();
3046        let sqlite = memory.sqlite();
3047
3048        // Normal anchor message.
3049        sqlite
3050            .save_message(cid, "user", "do something")
3051            .await
3052            .unwrap();
3053
3054        // Orphaned assistant[ToolUse] with legacy-only content.
3055        let use_parts = serde_json::to_string(&[MessagePart::ToolUse {
3056            id: "call_idem".to_string(),
3057            name: "shell".to_string(),
3058            input: serde_json::json!({"command": "ls"}),
3059        }])
3060        .unwrap();
3061        sqlite
3062            .save_message_with_parts(cid, "assistant", "[tool_use: shell(call_idem)]", &use_parts)
3063            .await
3064            .unwrap();
3065
3066        // Break the pair: insert a plain assistant message so the ToolUse is mid-history orphan.
3067        sqlite
3068            .save_message(cid, "assistant", "continuing")
3069            .await
3070            .unwrap();
3071
3072        // Orphaned user[ToolResult] with legacy-only content.
3073        let result_parts = serde_json::to_string(&[MessagePart::ToolResult {
3074            tool_use_id: "call_idem".to_string(),
3075            content: "output".to_string(),
3076            is_error: false,
3077        }])
3078        .unwrap();
3079        sqlite
3080            .save_message_with_parts(
3081                cid,
3082                "user",
3083                "[tool_result: call_idem]\noutput",
3084                &result_parts,
3085            )
3086            .await
3087            .unwrap();
3088
3089        sqlite
3090            .save_message(cid, "assistant", "final")
3091            .await
3092            .unwrap();
3093
3094        let memory_arc = std::sync::Arc::new(memory);
3095
3096        // First session: load_history performs soft-delete of the orphaned pair.
3097        let mut agent1 = Agent::new(
3098            mock_provider(vec![]),
3099            MockChannel::new(vec![]),
3100            create_test_registry(),
3101            None,
3102            5,
3103            MockToolExecutor::no_tools(),
3104        )
3105        .with_memory(memory_arc.clone(), cid, 50, 5, 100);
3106        agent1.load_history().await.unwrap();
3107        let count_after_first = agent1.msg.messages.len();
3108
3109        // Second session: a fresh agent loading the same conversation must not see the
3110        // soft-deleted orphans — the WARN log must not repeat.
3111        let mut agent2 = Agent::new(
3112            mock_provider(vec![]),
3113            MockChannel::new(vec![]),
3114            create_test_registry(),
3115            None,
3116            5,
3117            MockToolExecutor::no_tools(),
3118        )
3119        .with_memory(memory_arc.clone(), cid, 50, 5, 100);
3120        agent2.load_history().await.unwrap();
3121        let count_after_second = agent2.msg.messages.len();
3122
3123        // Both sessions must load the same number of messages — soft-deleted orphans excluded.
3124        assert_eq!(
3125            count_after_first, count_after_second,
3126            "second load_history must load the same message count as the first (soft-deleted orphans excluded)"
3127        );
3128    }
3129
3130    /// Edge case for #2529: an orphaned assistant message whose content has BOTH meaningful text
3131    /// AND a `tool_use` tag must NOT be soft-deleted. The `ToolUse` parts are stripped but the
3132    /// message is kept because it has human-readable content.
3133    #[tokio::test]
3134    async fn issue_2529_message_with_text_and_tool_tag_is_kept_after_part_strip() {
3135        use zeph_llm::provider::MessagePart;
3136
3137        let provider = mock_provider(vec![]);
3138        let channel = MockChannel::new(vec![]);
3139        let registry = create_test_registry();
3140        let executor = MockToolExecutor::no_tools();
3141
3142        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
3143        let cid = memory.sqlite().create_conversation().await.unwrap();
3144        let sqlite = memory.sqlite();
3145
3146        // Normal first exchange.
3147        sqlite
3148            .save_message(cid, "user", "check the files")
3149            .await
3150            .unwrap();
3151
3152        // Assistant message: has BOTH meaningful text AND a ToolUse part.
3153        // Content contains real prose + legacy tag — has_meaningful_content must return true.
3154        let use_parts = serde_json::to_string(&[MessagePart::ToolUse {
3155            id: "call_mixed".to_string(),
3156            name: "shell".to_string(),
3157            input: serde_json::json!({"command": "ls"}),
3158        }])
3159        .unwrap();
3160        sqlite
3161            .save_message_with_parts(
3162                cid,
3163                "assistant",
3164                "Let me list the directory. [tool_use: shell(call_mixed)]",
3165                &use_parts,
3166            )
3167            .await
3168            .unwrap();
3169
3170        // No matching ToolResult follows — the ToolUse is orphaned.
3171        sqlite.save_message(cid, "user", "thanks").await.unwrap();
3172        sqlite
3173            .save_message(cid, "assistant", "you are welcome")
3174            .await
3175            .unwrap();
3176
3177        let memory_arc = std::sync::Arc::new(memory);
3178        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
3179            memory_arc.clone(),
3180            cid,
3181            50,
3182            5,
3183            100,
3184        );
3185
3186        let messages_before = agent.msg.messages.len();
3187        agent.load_history().await.unwrap();
3188
3189        // All 4 messages must be present — the mixed-content assistant message is KEPT.
3190        assert_eq!(
3191            agent.msg.messages.len(),
3192            messages_before + 4,
3193            "assistant message with text + tool tag must not be removed after ToolUse strip"
3194        );
3195
3196        // The mixed-content assistant message must have its ToolUse parts stripped.
3197        let mixed_msg = agent
3198            .msg
3199            .messages
3200            .iter()
3201            .find(|m| m.content.contains("Let me list the directory"))
3202            .expect("mixed-content assistant message must still be in history");
3203        assert!(
3204            !mixed_msg
3205                .parts
3206                .iter()
3207                .any(|p| matches!(p, MessagePart::ToolUse { .. })),
3208            "orphaned ToolUse parts must be stripped even when message has meaningful text"
3209        );
3210        assert_eq!(
3211            mixed_msg.content, "Let me list the directory. [tool_use: shell(call_mixed)]",
3212            "content field must be unchanged — only parts are stripped"
3213        );
3214    }
3215
3216    // ── [skipped]/[stopped] ToolResult embedding guard (#2620) ──────────────
3217
3218    #[tokio::test]
3219    async fn persist_message_skipped_tool_result_does_not_embed() {
3220        use zeph_llm::provider::MessagePart;
3221
3222        let provider = mock_provider(vec![]);
3223        let channel = MockChannel::new(vec![]);
3224        let registry = create_test_registry();
3225        let executor = MockToolExecutor::no_tools();
3226
3227        let (tx, rx) = tokio::sync::watch::channel(MetricsSnapshot::default());
3228        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
3229        let cid = memory.sqlite().create_conversation().await.unwrap();
3230
3231        let mut agent = Agent::new(provider, channel, registry, None, 5, executor)
3232            .with_metrics(tx)
3233            .with_memory(std::sync::Arc::new(memory), cid, 50, 5, 100);
3234        agent.services.memory.persistence.autosave_assistant = true;
3235        agent.services.memory.persistence.autosave_min_length = 0;
3236
3237        let parts = vec![MessagePart::ToolResult {
3238            tool_use_id: "tu1".into(),
3239            content: "[skipped] bash tool was blocked by utility gate".into(),
3240            is_error: false,
3241        }];
3242
3243        agent
3244            .persist_message(
3245                Role::User,
3246                "[skipped] bash tool was blocked by utility gate",
3247                &parts,
3248                false,
3249            )
3250            .await;
3251
3252        assert_eq!(
3253            rx.borrow().embeddings_generated,
3254            0,
3255            "[skipped] ToolResult must not be embedded into Qdrant"
3256        );
3257    }
3258
3259    #[tokio::test]
3260    async fn persist_message_stopped_tool_result_does_not_embed() {
3261        use zeph_llm::provider::MessagePart;
3262
3263        let provider = mock_provider(vec![]);
3264        let channel = MockChannel::new(vec![]);
3265        let registry = create_test_registry();
3266        let executor = MockToolExecutor::no_tools();
3267
3268        let (tx, rx) = tokio::sync::watch::channel(MetricsSnapshot::default());
3269        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
3270        let cid = memory.sqlite().create_conversation().await.unwrap();
3271
3272        let mut agent = Agent::new(provider, channel, registry, None, 5, executor)
3273            .with_metrics(tx)
3274            .with_memory(std::sync::Arc::new(memory), cid, 50, 5, 100);
3275        agent.services.memory.persistence.autosave_assistant = true;
3276        agent.services.memory.persistence.autosave_min_length = 0;
3277
3278        let parts = vec![MessagePart::ToolResult {
3279            tool_use_id: "tu2".into(),
3280            content: "[stopped] execution limit reached".into(),
3281            is_error: false,
3282        }];
3283
3284        agent
3285            .persist_message(
3286                Role::User,
3287                "[stopped] execution limit reached",
3288                &parts,
3289                false,
3290            )
3291            .await;
3292
3293        assert_eq!(
3294            rx.borrow().embeddings_generated,
3295            0,
3296            "[stopped] ToolResult must not be embedded into Qdrant"
3297        );
3298    }
3299
3300    #[tokio::test]
3301    async fn persist_message_normal_tool_result_is_saved_not_blocked_by_guard() {
3302        // Regression: a normal ToolResult (no [skipped]/[stopped] prefix) must not be
3303        // suppressed by the utility-gate guard and must reach the save path (SQLite).
3304        use zeph_llm::provider::MessagePart;
3305
3306        let provider = mock_provider(vec![]);
3307        let channel = MockChannel::new(vec![]);
3308        let registry = create_test_registry();
3309        let executor = MockToolExecutor::no_tools();
3310
3311        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
3312        let cid = memory.sqlite().create_conversation().await.unwrap();
3313        let memory_arc = std::sync::Arc::new(memory);
3314
3315        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
3316            memory_arc.clone(),
3317            cid,
3318            50,
3319            5,
3320            100,
3321        );
3322        agent.services.memory.persistence.autosave_assistant = true;
3323        agent.services.memory.persistence.autosave_min_length = 0;
3324
3325        let content = "total 42\ndrwxr-xr-x  5 user group";
3326        let parts = vec![MessagePart::ToolResult {
3327            tool_use_id: "tu3".into(),
3328            content: content.into(),
3329            is_error: false,
3330        }];
3331
3332        agent
3333            .persist_message(Role::User, content, &parts, false)
3334            .await;
3335
3336        // Must be saved to SQLite — confirms the guard did not block this path.
3337        let history = memory_arc.sqlite().load_history(cid, 50).await.unwrap();
3338        assert_eq!(
3339            history.len(),
3340            1,
3341            "normal ToolResult must be saved to SQLite"
3342        );
3343        assert_eq!(history[0].content, content);
3344    }
3345
3346    /// Verify that `enqueue_trajectory_extraction_task` uses a bounded tail slice instead of
3347    /// cloning the full message vec. We confirm the slice logic by checking that the
3348    /// `tail_start` calculation correctly bounds the window even with more messages than
3349    /// `max_messages`.
3350    #[test]
3351    fn trajectory_extraction_slice_bounds_messages() {
3352        // Replicate the slice logic from enqueue_trajectory_extraction_task.
3353        let max_messages: usize = 20;
3354        let total_messages = 100usize;
3355
3356        let tail_start = total_messages.saturating_sub(max_messages);
3357        let window = total_messages - tail_start;
3358
3359        assert_eq!(
3360            window, 20,
3361            "slice should contain exactly max_messages items"
3362        );
3363        assert_eq!(tail_start, 80, "slice should start at len - max_messages");
3364    }
3365
3366    #[test]
3367    fn trajectory_extraction_slice_handles_few_messages() {
3368        let max_messages: usize = 20;
3369        let total_messages = 5usize;
3370
3371        let tail_start = total_messages.saturating_sub(max_messages);
3372        let window = total_messages - tail_start;
3373
3374        assert_eq!(window, 5, "should return all messages when fewer than max");
3375        assert_eq!(tail_start, 0, "slice should start from the beginning");
3376    }
3377
3378    // --- #3168 regression tests ---
3379
3380    /// Round-trip: persist `Assistant[tool_use]` + `User[tool_result]`, then `load_history`.
3381    /// After `sanitize_tool_pairs` the pair must be intact — no WARN, both messages present
3382    /// with non-empty parts.
3383    #[tokio::test]
3384    async fn regression_3168_complete_tool_pair_survives_round_trip() {
3385        use zeph_llm::provider::MessagePart;
3386
3387        let provider = mock_provider(vec![]);
3388        let channel = MockChannel::new(vec![]);
3389        let registry = create_test_registry();
3390        let executor = MockToolExecutor::no_tools();
3391
3392        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
3393        let cid = memory.sqlite().create_conversation().await.unwrap();
3394        let sqlite = memory.sqlite();
3395
3396        let use_parts = serde_json::to_string(&[MessagePart::ToolUse {
3397            id: "r3168_call".to_string(),
3398            name: "shell".to_string(),
3399            input: serde_json::json!({"command": "echo hi"}),
3400        }])
3401        .unwrap();
3402        sqlite
3403            .save_message_with_parts(
3404                cid,
3405                "assistant",
3406                "[tool_use: shell(r3168_call)]",
3407                &use_parts,
3408            )
3409            .await
3410            .unwrap();
3411
3412        let result_parts = serde_json::to_string(&[MessagePart::ToolResult {
3413            tool_use_id: "r3168_call".to_string(),
3414            content: "[skipped]".to_string(),
3415            is_error: false,
3416        }])
3417        .unwrap();
3418        sqlite
3419            .save_message_with_parts(cid, "user", "[tool_result: r3168_call]", &result_parts)
3420            .await
3421            .unwrap();
3422
3423        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
3424            std::sync::Arc::new(memory),
3425            cid,
3426            50,
3427            5,
3428            100,
3429        );
3430
3431        let base = agent.msg.messages.len();
3432        agent.load_history().await.unwrap();
3433
3434        assert_eq!(
3435            agent.msg.messages.len(),
3436            base + 2,
3437            "both messages of the complete pair must survive load_history"
3438        );
3439
3440        let assistant_msg = agent
3441            .msg
3442            .messages
3443            .iter()
3444            .find(|m| m.role == Role::Assistant)
3445            .expect("assistant message missing after load_history");
3446        assert!(
3447            assistant_msg
3448                .parts
3449                .iter()
3450                .any(|p| matches!(p, MessagePart::ToolUse { id, .. } if id == "r3168_call")),
3451            "ToolUse part must be preserved in assistant message"
3452        );
3453
3454        let user_msg = agent
3455            .msg
3456            .messages
3457            .iter()
3458            .rev()
3459            .find(|m| m.role == Role::User)
3460            .expect("user message missing after load_history");
3461        assert!(
3462            user_msg.parts.iter().any(|p| matches!(p, MessagePart::ToolResult { tool_use_id, .. } if tool_use_id == "r3168_call")),
3463            "ToolResult part must be preserved in user message"
3464        );
3465    }
3466
3467    /// If parts serialization fails, `persist_message` must return early and not store
3468    /// a row with empty parts that would create an orphaned `tool_use` on next session load.
3469    /// We verify this by writing a row with invalid `parts_json` directly and confirming
3470    /// `load_history` skips it (empty parts row is not injected into the agent).
3471    #[tokio::test]
3472    async fn regression_3168_corrupt_parts_row_skipped_on_load() {
3473        use zeph_llm::provider::MessagePart;
3474
3475        let provider = mock_provider(vec![]);
3476        let channel = MockChannel::new(vec![]);
3477        let registry = create_test_registry();
3478        let executor = MockToolExecutor::no_tools();
3479
3480        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
3481        let cid = memory.sqlite().create_conversation().await.unwrap();
3482        let sqlite = memory.sqlite();
3483
3484        // Simulate the pre-fix bug: assistant message stored with empty parts_json "[]"
3485        // even though it should have had a ToolUse part. This is what persist_message used
3486        // to do before the early-return fix.
3487        sqlite
3488            .save_message_with_parts(cid, "assistant", "[tool_use: shell(corrupt)]", "[]")
3489            .await
3490            .unwrap();
3491
3492        // User message with the matching tool_result stored correctly.
3493        let result_parts = serde_json::to_string(&[MessagePart::ToolResult {
3494            tool_use_id: "corrupt".to_string(),
3495            content: "result".to_string(),
3496            is_error: false,
3497        }])
3498        .unwrap();
3499        sqlite
3500            .save_message_with_parts(cid, "user", "[tool_result: corrupt]", &result_parts)
3501            .await
3502            .unwrap();
3503
3504        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
3505            std::sync::Arc::new(memory),
3506            cid,
3507            50,
3508            5,
3509            100,
3510        );
3511
3512        let base = agent.msg.messages.len();
3513        agent.load_history().await.unwrap();
3514
3515        // The user ToolResult has no matching ToolUse in the assistant message (parts="[]"),
3516        // so sanitize_tool_pairs must strip the orphaned ToolResult.
3517        // Net result: only the content-only assistant message survives; user msg is removed
3518        // because after stripping its only part it becomes empty.
3519        let loaded = agent.msg.messages.len() - base;
3520        // No message injected with orphaned ToolResult parts — either stripped entirely or
3521        // the ToolResult part removed. Verify no user message with ToolResult remains.
3522        let orphan_present = agent.msg.messages.iter().any(|m| {
3523            m.role == Role::User
3524                && m.parts.iter().any(|p| {
3525                    matches!(p, MessagePart::ToolResult { tool_use_id, .. } if tool_use_id == "corrupt")
3526                })
3527        });
3528        assert!(
3529            !orphan_present,
3530            "orphaned ToolResult must not survive load_history; loaded={loaded}"
3531        );
3532    }
3533}