Skip to main content

zeph_core/agent/
persistence.rs

1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4use crate::channel::Channel;
5use zeph_agent_persistence::graph::{build_graph_extraction_config, collect_context_messages};
6use zeph_agent_persistence::{
7    MemoryPersistenceView, MetricsView, PersistMessageRequest, PersistenceService, SecurityView,
8};
9use zeph_llm::provider::{LlmProvider as _, MessagePart, Role};
10
11use super::Agent;
12
13impl<C: Channel> Agent<C> {
14    /// Load conversation history from memory and inject into messages.
15    ///
16    /// Delegates to [`PersistenceService::load_history`]. Post-load operations that touch
17    /// agent-internal singletons (session count increment, semantic fact count recompute,
18    /// token recompute) remain in this shim because they access fields outside the
19    /// borrow-lens view.
20    ///
21    /// # Errors
22    ///
23    /// Returns an error if loading history from `SQLite` fails.
24    ///
25    /// # Panics
26    ///
27    /// Does not panic. The internal `unwrap_or(0)` conversions are on fallible `i64 → usize`
28    /// casts that saturate to zero on overflow; they cannot panic.
29    pub async fn load_history(&mut self) -> Result<(), super::error::AgentError> {
30        let (Some(memory), Some(cid)) = (
31            self.services.memory.persistence.memory.as_ref(),
32            self.services.memory.persistence.conversation_id,
33        ) else {
34            return Ok(());
35        };
36
37        // Clone so we can call methods after the borrow-lens view is dropped.
38        let memory = memory.clone();
39
40        let mut unsummarized = self.services.memory.persistence.unsummarized_count;
41        // `memory_view` is not `mut` — the `&mut unsummarized` inside is established at
42        // construction and passed as `&memory_view` to load_history (shared borrow).
43        let memory_view = MemoryPersistenceView {
44            memory: Some(&memory),
45            conversation_id: self.services.memory.persistence.conversation_id,
46            autosave_assistant: self.services.memory.persistence.autosave_assistant,
47            autosave_min_length: self.services.memory.persistence.autosave_min_length,
48            unsummarized_count: &mut unsummarized,
49            goal_text: self.services.memory.extraction.goal_text.clone(),
50        };
51        let mut sqlite_delta = 0u64;
52        let mut embed_delta = 0u64;
53        let mut guard_delta = 0u64;
54        let mut metrics_view = MetricsView {
55            sqlite_message_count: &mut sqlite_delta,
56            embeddings_generated: &mut embed_delta,
57            exfiltration_memory_guards: &mut guard_delta,
58        };
59
60        let svc = PersistenceService::new();
61        let outcome = svc
62            .load_history(
63                &mut self.msg.messages,
64                &mut self.msg.last_persisted_message_id,
65                &mut self.msg.deferred_db_hide_ids,
66                &mut self.msg.deferred_db_summaries,
67                &memory_view,
68                &zeph_config::Config::default(),
69                &mut metrics_view,
70            )
71            .await
72            .map_err(|e| {
73                super::error::AgentError::Memory(zeph_memory::MemoryError::Other(e.to_string()))
74            })?;
75
76        // Write back lens-borrowed local to the field.
77        self.services.memory.persistence.unsummarized_count = unsummarized;
78
79        if outcome.messages_loaded > 0 {
80            // Increment session counts so tier promotion can track cross-session access.
81            let _ = memory
82                .sqlite()
83                .increment_session_counts_for_conversation(cid)
84                .await
85                .inspect_err(|e| {
86                    tracing::warn!(error = %e, "failed to increment tier session counts");
87                });
88        }
89
90        // Set absolute SQLite message count and semantic fact count (not deltas).
91        self.update_metrics(|m| {
92            m.sqlite_message_count = outcome.sqlite_total_messages;
93        });
94        if let Ok(count) = memory.sqlite().count_semantic_facts().await {
95            let count_u64 = u64::try_from(count).unwrap_or(0);
96            self.update_metrics(|m| {
97                m.semantic_fact_count = count_u64;
98            });
99        }
100        if let Ok(count) = memory.unsummarized_message_count(cid).await {
101            self.services.memory.persistence.unsummarized_count =
102                usize::try_from(count).unwrap_or(0);
103        }
104
105        self.recompute_prompt_tokens();
106        Ok(())
107    }
108
109    /// Persist a message to memory.
110    ///
111    /// `has_injection_flags` controls whether Qdrant embedding is skipped for this message.
112    /// When `true` and `guard_memory_writes` is enabled, only `SQLite` is written — the message
113    /// is saved for conversation continuity but will not pollute semantic search (M2, D2).
114    #[cfg_attr(
115        feature = "profiling",
116        tracing::instrument(name = "agent.persist_message", skip_all)
117    )]
118    pub(crate) async fn persist_message(
119        &mut self,
120        role: Role,
121        content: &str,
122        parts: &[MessagePart],
123        has_injection_flags: bool,
124    ) {
125        // M2: call should_guard_memory_write for its diagnostic side effects (tracing + security
126        // event). The bool result is passed into SecurityView so the service can decide whether
127        // to skip Qdrant embedding.
128        let guard_event = self
129            .services
130            .security
131            .exfiltration_guard
132            .should_guard_memory_write(has_injection_flags);
133        if let Some(ref event) = guard_event {
134            tracing::warn!(
135                ?event,
136                "exfiltration guard: skipping Qdrant embedding for flagged content"
137            );
138            self.push_security_event(
139                zeph_common::SecurityEventCategory::ExfiltrationBlock,
140                "memory_write",
141                "Qdrant embedding skipped: flagged content",
142            );
143        }
144
145        let req = PersistMessageRequest::from_borrowed(role, content, parts, has_injection_flags);
146
147        let mut unsummarized = self.services.memory.persistence.unsummarized_count;
148        let memory_arc = self.services.memory.persistence.memory.clone();
149        let mut memory_view = MemoryPersistenceView {
150            memory: memory_arc.as_ref(),
151            conversation_id: self.services.memory.persistence.conversation_id,
152            autosave_assistant: self.services.memory.persistence.autosave_assistant,
153            autosave_min_length: self.services.memory.persistence.autosave_min_length,
154            unsummarized_count: &mut unsummarized,
155            goal_text: self.services.memory.extraction.goal_text.clone(),
156        };
157        let security = SecurityView {
158            guard_memory_writes: guard_event.is_some(),
159            _phantom: std::marker::PhantomData,
160        };
161        let mut sqlite_delta = 0u64;
162        let mut embed_delta = 0u64;
163        let mut guard_delta = 0u64;
164        let mut metrics_view = MetricsView {
165            sqlite_message_count: &mut sqlite_delta,
166            embeddings_generated: &mut embed_delta,
167            exfiltration_memory_guards: &mut guard_delta,
168        };
169
170        let svc = PersistenceService::new();
171        let outcome = svc
172            .persist_message(
173                req,
174                &mut self.msg.last_persisted_message_id,
175                &mut memory_view,
176                &security,
177                &zeph_config::Config::default(),
178                &mut metrics_view,
179            )
180            .await;
181
182        // Write back the unsummarized counter (lens borrowed a local copy).
183        self.services.memory.persistence.unsummarized_count = unsummarized;
184
185        // Forward metric deltas through the watch broadcast.
186        self.update_metrics(|m| {
187            m.sqlite_message_count += sqlite_delta;
188            m.embeddings_generated += embed_delta;
189            // guard_delta is already tracked via push_security_event above.
190            m.exfiltration_memory_guards += guard_delta;
191        });
192
193        if outcome.message_id.is_none() {
194            return;
195        }
196
197        // Phase 2: enqueue enrichment tasks via supervisor (non-blocking).
198        // check_summarization signals completion via SummarizationSignal, consumed in reap()
199        // between turns — no shared mutable state across tasks (S1 fix).
200        self.enqueue_summarization_task();
201
202        // FIX-1: skip graph extraction for tool result messages — they contain raw structured
203        // output (TOML, JSON, code) that pollutes the entity graph with noise.
204        let has_tool_result_parts = parts
205            .iter()
206            .any(|p| matches!(p, MessagePart::ToolResult { .. }));
207
208        self.enqueue_graph_extraction_task(content, has_injection_flags, has_tool_result_parts)
209            .await;
210
211        // Persona extraction: run only for user messages that are not tool results and not injected.
212        if role == Role::User && !has_tool_result_parts && !has_injection_flags {
213            self.enqueue_persona_extraction_task();
214        }
215
216        // Trajectory extraction: run after turns that contained tool results.
217        if has_tool_result_parts {
218            self.enqueue_trajectory_extraction_task();
219        }
220
221        // ReasoningBank distillation: runs only after the final assistant message of a turn
222        // (C2 fix: skip intermediate tool-call messages). A message with ToolUse parts is an
223        // intermediate step; the final assistant message has no ToolUse parts.
224        // S-Med1: skip if injection patterns detected — mirrors graph extraction guard.
225        let has_tool_use_parts = parts
226            .iter()
227            .any(|p| matches!(p, MessagePart::ToolUse { .. }));
228        if role == Role::Assistant && !has_tool_use_parts && !has_injection_flags {
229            self.enqueue_reasoning_extraction_task();
230        }
231    }
232
233    /// Enqueue background summarization via the supervisor (S1 fix: no shared `AtomicUsize`).
234    fn enqueue_summarization_task(&mut self) {
235        let (Some(memory), Some(cid)) = (
236            self.services.memory.persistence.memory.clone(),
237            self.services.memory.persistence.conversation_id,
238        ) else {
239            return;
240        };
241
242        if self.services.memory.persistence.unsummarized_count
243            <= self.services.memory.compaction.summarization_threshold
244        {
245            return;
246        }
247
248        let batch_size = self.services.memory.compaction.summarization_threshold / 2;
249
250        self.runtime.lifecycle.supervisor.spawn_summarization("summarization", async move {
251            match tokio::time::timeout(
252                std::time::Duration::from_secs(30),
253                memory.summarize(cid, batch_size),
254            )
255            .await
256            {
257                Ok(Ok(Some(summary_id))) => {
258                    tracing::info!(
259                        "background summarization: created summary {summary_id} for conversation {cid}"
260                    );
261                    true
262                }
263                Ok(Ok(None)) => {
264                    tracing::debug!("background summarization: no summarization needed");
265                    false
266                }
267                Ok(Err(e)) => {
268                    tracing::error!("background summarization failed: {e:#}");
269                    false
270                }
271                Err(_) => {
272                    tracing::warn!("background summarization timed out after 30s");
273                    false
274                }
275            }
276        });
277    }
278
279    /// Prepare graph extraction guards in foreground, then enqueue heavy work via supervisor.
280    ///
281    /// Guards (enabled check, injection/tool-result skip) stay on the foreground path.
282    /// The RPE check and actual extraction run in background (S2: no `send_status`).
283    async fn enqueue_graph_extraction_task(
284        &mut self,
285        content: &str,
286        has_injection_flags: bool,
287        has_tool_result_parts: bool,
288    ) {
289        if self.services.memory.persistence.memory.is_none()
290            || self.services.memory.persistence.conversation_id.is_none()
291        {
292            return;
293        }
294        if has_tool_result_parts {
295            tracing::debug!("graph extraction skipped: message contains ToolResult parts");
296            return;
297        }
298        if has_injection_flags {
299            tracing::warn!("graph extraction skipped: injection patterns detected in content");
300            return;
301        }
302
303        let cfg = &self.services.memory.extraction.graph_config;
304        if !cfg.enabled {
305            return;
306        }
307        let extraction_cfg = build_graph_extraction_config(
308            cfg,
309            self.services
310                .memory
311                .persistence
312                .conversation_id
313                .map(|c| c.0),
314        );
315
316        // RPE check: embed + compute surprise score. Stays on foreground to avoid
317        // capturing the rpe_router mutex in a background task.
318        if self.rpe_should_skip(content).await {
319            tracing::debug!("D-MEM RPE: low-surprise turn, skipping graph extraction");
320            return;
321        }
322
323        let context_messages = collect_context_messages(&self.msg.messages);
324
325        let Some(memory) = self.services.memory.persistence.memory.clone() else {
326            return;
327        };
328
329        let validator: zeph_memory::semantic::PostExtractValidator =
330            if self.services.security.memory_validator.is_enabled() {
331                let v = self.services.security.memory_validator.clone();
332                Some(Box::new(move |result| {
333                    v.validate_graph_extraction(result)
334                        .map_err(|e| e.to_string())
335                }))
336            } else {
337                None
338            };
339
340        self.spawn_graph_extraction_task(
341            memory,
342            content,
343            context_messages,
344            extraction_cfg,
345            validator,
346        );
347
348        // Sync community failures and extraction metrics (cheap, foreground-safe).
349        self.sync_community_detection_failures();
350        self.sync_graph_extraction_metrics();
351        self.enqueue_graph_count_sync_task();
352    }
353
354    fn spawn_graph_extraction_task(
355        &mut self,
356        memory: std::sync::Arc<zeph_memory::semantic::SemanticMemory>,
357        content: &str,
358        context_messages: Vec<String>,
359        extraction_cfg: zeph_memory::semantic::GraphExtractionConfig,
360        validator: zeph_memory::semantic::PostExtractValidator,
361    ) {
362        let content_owned = content.to_owned();
363        let graph_store = memory.graph_store.clone();
364        let metrics_tx = self.runtime.metrics.metrics_tx.clone();
365        let start_time = self.runtime.lifecycle.start_time;
366
367        self.runtime.lifecycle.supervisor.spawn(
368            super::agent_supervisor::TaskClass::Enrichment,
369            "graph_extraction",
370            async move {
371                let extraction_handle = memory.spawn_graph_extraction(
372                    content_owned,
373                    context_messages,
374                    extraction_cfg,
375                    validator,
376                );
377
378                // After extraction completes, refresh graph count metrics.
379                if let (Some(store), Some(tx)) = (graph_store, metrics_tx) {
380                    let _ = extraction_handle.await;
381                    let (entities, edges, communities) = tokio::join!(
382                        store.entity_count(),
383                        store.active_edge_count(),
384                        store.community_count()
385                    );
386                    let elapsed = start_time.elapsed().as_secs();
387                    tx.send_modify(|m| {
388                        m.uptime_seconds = elapsed;
389                        m.graph_entities_total = entities.unwrap_or(0).cast_unsigned();
390                        m.graph_edges_total = edges.unwrap_or(0).cast_unsigned();
391                        m.graph_communities_total = communities.unwrap_or(0).cast_unsigned();
392                    });
393                } else {
394                    let _ = extraction_handle.await;
395                }
396
397                tracing::debug!("background graph extraction complete");
398            },
399        );
400    }
401
402    // sync_graph_counts and sync_guidelines_status are DB reads; enqueued as Telemetry background.
403    fn enqueue_graph_count_sync_task(&mut self) {
404        let memory_for_sync = self.services.memory.persistence.memory.clone();
405        let metrics_tx_sync = self.runtime.metrics.metrics_tx.clone();
406        let start_time_sync = self.runtime.lifecycle.start_time;
407        let cid_sync = self.services.memory.persistence.conversation_id;
408        let graph_store_sync = memory_for_sync.as_ref().and_then(|m| m.graph_store.clone());
409        let sqlite_sync = memory_for_sync.as_ref().map(|m| m.sqlite().clone());
410        let guidelines_enabled = self.services.memory.extraction.graph_config.enabled;
411
412        self.runtime.lifecycle.supervisor.spawn(
413            super::agent_supervisor::TaskClass::Telemetry,
414            "graph_count_sync",
415            async move {
416                let Some(store) = graph_store_sync else {
417                    return;
418                };
419                let Some(tx) = metrics_tx_sync else { return };
420
421                let (entities, edges, communities) = tokio::join!(
422                    store.entity_count(),
423                    store.active_edge_count(),
424                    store.community_count()
425                );
426                let elapsed = start_time_sync.elapsed().as_secs();
427                tx.send_modify(|m| {
428                    m.uptime_seconds = elapsed;
429                    m.graph_entities_total = entities.unwrap_or(0).cast_unsigned();
430                    m.graph_edges_total = edges.unwrap_or(0).cast_unsigned();
431                    m.graph_communities_total = communities.unwrap_or(0).cast_unsigned();
432                });
433
434                // Sync guidelines status.
435                if guidelines_enabled && let Some(sqlite) = sqlite_sync {
436                    match tokio::time::timeout(
437                        std::time::Duration::from_secs(10),
438                        sqlite.load_compression_guidelines_meta(cid_sync),
439                    )
440                    .await
441                    {
442                        Ok(Ok((version, created_at))) => {
443                            #[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
444                            let version_u32 = u32::try_from(version).unwrap_or(0);
445                            tx.send_modify(|m| {
446                                m.guidelines_version = version_u32;
447                                m.guidelines_updated_at = created_at;
448                            });
449                        }
450                        Ok(Err(e)) => {
451                            tracing::debug!("guidelines status sync failed: {e:#}");
452                        }
453                        Err(_) => {
454                            tracing::debug!("guidelines status sync timed out");
455                        }
456                    }
457                }
458            },
459        );
460    }
461
462    /// Enqueue persona extraction via supervisor (background, no `send_status`).
463    fn enqueue_persona_extraction_task(&mut self) {
464        use zeph_memory::semantic::{PersonaExtractionConfig, extract_persona_facts};
465
466        let cfg = &self.services.memory.extraction.persona_config;
467        if !cfg.enabled {
468            return;
469        }
470
471        let Some(memory) = &self.services.memory.persistence.memory else {
472            return;
473        };
474
475        let user_messages: Vec<String> = self
476            .msg
477            .messages
478            .iter()
479            .filter(|m| {
480                m.role == Role::User
481                    && !m
482                        .parts
483                        .iter()
484                        .any(|p| matches!(p, MessagePart::ToolResult { .. }))
485            })
486            .take(8)
487            .map(|m| {
488                if m.content.len() > 2048 {
489                    m.content[..m.content.floor_char_boundary(2048)].to_owned()
490                } else {
491                    m.content.clone()
492                }
493            })
494            .collect();
495
496        if user_messages.len() < cfg.min_messages {
497            return;
498        }
499
500        let timeout_secs = cfg.extraction_timeout_secs;
501        let extraction_cfg = PersonaExtractionConfig {
502            enabled: cfg.enabled,
503            min_messages: cfg.min_messages,
504            max_messages: cfg.max_messages,
505            extraction_timeout_secs: timeout_secs,
506        };
507
508        let provider = self.resolve_background_provider(cfg.persona_provider.as_str());
509        let store = memory.sqlite().clone();
510        let conversation_id = self
511            .services
512            .memory
513            .persistence
514            .conversation_id
515            .map(|c| c.0);
516
517        self.runtime.lifecycle.supervisor.spawn(
518            super::agent_supervisor::TaskClass::Enrichment,
519            "persona_extraction",
520            async move {
521                let user_message_refs: Vec<&str> =
522                    user_messages.iter().map(String::as_str).collect();
523                let fut = extract_persona_facts(
524                    &store,
525                    &provider,
526                    &user_message_refs,
527                    &extraction_cfg,
528                    conversation_id,
529                );
530                match tokio::time::timeout(std::time::Duration::from_secs(timeout_secs), fut).await
531                {
532                    Ok(Ok(n)) => tracing::debug!(upserted = n, "persona extraction complete"),
533                    Ok(Err(e)) => tracing::warn!(error = %e, "persona extraction failed"),
534                    Err(_) => tracing::warn!(
535                        timeout_secs,
536                        "persona extraction timed out — no facts written this turn"
537                    ),
538                }
539            },
540        );
541    }
542
543    /// Enqueue trajectory extraction via supervisor (background).
544    fn enqueue_trajectory_extraction_task(&mut self) {
545        use zeph_memory::semantic::{TrajectoryExtractionConfig, extract_trajectory_entries};
546
547        let cfg = self.services.memory.extraction.trajectory_config.clone();
548        if !cfg.enabled {
549            return;
550        }
551
552        let Some(memory) = &self.services.memory.persistence.memory else {
553            return;
554        };
555
556        let conversation_id = match self.services.memory.persistence.conversation_id {
557            Some(cid) => cid.0,
558            None => return,
559        };
560
561        let tail_start = self.msg.messages.len().saturating_sub(cfg.max_messages);
562        let turn_messages: Vec<zeph_llm::provider::Message> =
563            self.msg.messages[tail_start..].to_vec();
564
565        if turn_messages.is_empty() {
566            return;
567        }
568
569        let extraction_cfg = TrajectoryExtractionConfig {
570            enabled: cfg.enabled,
571            max_messages: cfg.max_messages,
572            extraction_timeout_secs: cfg.extraction_timeout_secs,
573        };
574
575        let provider = self.resolve_background_provider(cfg.trajectory_provider.as_str());
576        let store = memory.sqlite().clone();
577        let min_confidence = cfg.min_confidence;
578
579        self.runtime.lifecycle.supervisor.spawn(
580            super::agent_supervisor::TaskClass::Enrichment,
581            "trajectory_extraction",
582            async move {
583                let entries =
584                    match extract_trajectory_entries(&provider, &turn_messages, &extraction_cfg)
585                        .await
586                    {
587                        Ok(e) => e,
588                        Err(e) => {
589                            tracing::warn!(error = %e, "trajectory extraction failed");
590                            return;
591                        }
592                    };
593
594                let last_id = store
595                    .trajectory_last_extracted_message_id(conversation_id)
596                    .await
597                    .unwrap_or(0);
598
599                let mut max_id = last_id;
600                for entry in &entries {
601                    if entry.confidence < min_confidence {
602                        continue;
603                    }
604                    let tools_json = serde_json::to_string(&entry.tools_used)
605                        .unwrap_or_else(|_| "[]".to_string());
606                    match store
607                        .insert_trajectory_entry(zeph_memory::NewTrajectoryEntry {
608                            conversation_id: Some(conversation_id),
609                            turn_index: 0,
610                            kind: &entry.kind,
611                            intent: &entry.intent,
612                            outcome: &entry.outcome,
613                            tools_used: &tools_json,
614                            confidence: entry.confidence,
615                        })
616                        .await
617                    {
618                        Ok(id) => {
619                            if id > max_id {
620                                max_id = id;
621                            }
622                        }
623                        Err(e) => tracing::warn!(error = %e, "failed to insert trajectory entry"),
624                    }
625                }
626
627                if max_id > last_id {
628                    let _ = store
629                        .set_trajectory_last_extracted_message_id(conversation_id, max_id)
630                        .await;
631                }
632
633                tracing::debug!(
634                    count = entries.len(),
635                    conversation_id,
636                    "trajectory extraction complete"
637                );
638            },
639        );
640    }
641
642    /// Enqueue reasoning strategy distillation via supervisor (background, fire-and-forget).
643    ///
644    /// Mirrors [`Self::enqueue_trajectory_extraction_task`]. Runs after every assistant turn
645    /// when `memory.reasoning.enabled = true` and a `ReasoningMemory` is attached.
646    fn enqueue_reasoning_extraction_task(&mut self) {
647        let cfg = self.services.memory.extraction.reasoning_config.clone();
648        if !cfg.enabled {
649            return;
650        }
651
652        let Some(memory) = &self.services.memory.persistence.memory else {
653            return;
654        };
655
656        let Some(reasoning) = memory.reasoning.clone() else {
657            return;
658        };
659
660        let tail_start = self.msg.messages.len().saturating_sub(cfg.max_messages);
661        let turn_messages: Vec<zeph_llm::provider::Message> =
662            self.msg.messages[tail_start..].to_vec();
663
664        if turn_messages.len() < cfg.min_messages {
665            return;
666        }
667
668        let extract_provider = self.resolve_background_provider(cfg.extract_provider.as_str());
669        let distill_provider = self.resolve_background_provider(cfg.distill_provider.as_str());
670        let embed_provider = memory.effective_embed_provider().clone();
671        let store_limit = cfg.store_limit;
672        let extraction_timeout = std::time::Duration::from_secs(cfg.extraction_timeout_secs);
673        let distill_timeout = std::time::Duration::from_secs(cfg.distill_timeout_secs);
674        let self_judge_window = cfg.self_judge_window;
675        let min_assistant_chars = cfg.min_assistant_chars;
676
677        self.runtime.lifecycle.supervisor.spawn(
678            super::agent_supervisor::TaskClass::Enrichment,
679            "reasoning_extraction",
680            async move {
681                if let Err(e) = zeph_memory::process_reasoning_turn(
682                    &reasoning,
683                    &extract_provider,
684                    &distill_provider,
685                    &embed_provider,
686                    &turn_messages,
687                    zeph_memory::ProcessTurnConfig {
688                        store_limit,
689                        extraction_timeout,
690                        distill_timeout,
691                        self_judge_window,
692                        min_assistant_chars,
693                    },
694                )
695                .await
696                {
697                    tracing::warn!(error = %e, "reasoning: process_turn failed");
698                }
699
700                tracing::debug!("reasoning extraction complete");
701            },
702        );
703    }
704
705    /// D-MEM RPE check: returns `true` when the current turn should skip graph extraction.
706    ///
707    /// Embeds `content`, computes RPE via the router, and updates the router state.
708    /// Returns `false` (do not skip) on any error — conservative fallback.
709    async fn rpe_should_skip(&mut self, content: &str) -> bool {
710        let Some(ref rpe_mutex) = self.services.memory.extraction.rpe_router else {
711            return false;
712        };
713        let Some(memory) = &self.services.memory.persistence.memory else {
714            return false;
715        };
716        let candidates = zeph_memory::extract_candidate_entities(content);
717        let provider = memory.provider();
718        let Ok(Ok(emb_vec)) =
719            tokio::time::timeout(std::time::Duration::from_secs(5), provider.embed(content)).await
720        else {
721            return false; // embed failed/timed out → extract
722        };
723        if let Ok(mut router) = rpe_mutex.lock() {
724            let signal = router.compute(&emb_vec, &candidates);
725            router.push_embedding(emb_vec);
726            router.push_entities(&candidates);
727            !signal.should_extract
728        } else {
729            tracing::warn!("rpe_router mutex poisoned; falling through to extract");
730            false
731        }
732    }
733}
734
735#[cfg(test)]
736mod tests {
737    use super::super::agent_tests::{
738        MetricsSnapshot, MockChannel, MockToolExecutor, create_test_registry, mock_provider,
739    };
740    use super::*;
741    use zeph_llm::any::AnyProvider;
742    use zeph_llm::provider::Message;
743    use zeph_memory::semantic::SemanticMemory;
744
745    async fn test_memory(provider: &AnyProvider) -> SemanticMemory {
746        SemanticMemory::new(
747            ":memory:",
748            "http://127.0.0.1:1",
749            None,
750            provider.clone(),
751            "test-model",
752        )
753        .await
754        .unwrap()
755    }
756
757    #[tokio::test]
758    async fn load_history_without_memory_returns_ok() {
759        let provider = mock_provider(vec![]);
760        let channel = MockChannel::new(vec![]);
761        let registry = create_test_registry();
762        let executor = MockToolExecutor::no_tools();
763        let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
764
765        let result = agent.load_history().await;
766        assert!(result.is_ok());
767        // No messages added when no memory is configured
768        assert_eq!(agent.msg.messages.len(), 1); // system prompt only
769    }
770
771    #[tokio::test]
772    async fn load_history_with_messages_injects_into_agent() {
773        let provider = mock_provider(vec![]);
774        let channel = MockChannel::new(vec![]);
775        let registry = create_test_registry();
776        let executor = MockToolExecutor::no_tools();
777
778        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
779        let cid = memory.sqlite().create_conversation().await.unwrap();
780
781        memory
782            .sqlite()
783            .save_message(cid, "user", "hello from history")
784            .await
785            .unwrap();
786        memory
787            .sqlite()
788            .save_message(cid, "assistant", "hi back")
789            .await
790            .unwrap();
791
792        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
793            std::sync::Arc::new(memory),
794            cid,
795            50,
796            5,
797            100,
798        );
799
800        let messages_before = agent.msg.messages.len();
801        agent.load_history().await.unwrap();
802        // Two messages were added from history
803        assert_eq!(agent.msg.messages.len(), messages_before + 2);
804    }
805
806    #[tokio::test]
807    async fn load_history_skips_empty_messages() {
808        let provider = mock_provider(vec![]);
809        let channel = MockChannel::new(vec![]);
810        let registry = create_test_registry();
811        let executor = MockToolExecutor::no_tools();
812
813        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
814        let cid = memory.sqlite().create_conversation().await.unwrap();
815
816        // Save an empty message (should be skipped) and a valid one
817        memory
818            .sqlite()
819            .save_message(cid, "user", "   ")
820            .await
821            .unwrap();
822        memory
823            .sqlite()
824            .save_message(cid, "user", "real message")
825            .await
826            .unwrap();
827
828        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
829            std::sync::Arc::new(memory),
830            cid,
831            50,
832            5,
833            100,
834        );
835
836        let messages_before = agent.msg.messages.len();
837        agent.load_history().await.unwrap();
838        // Only the non-empty message is loaded
839        assert_eq!(agent.msg.messages.len(), messages_before + 1);
840    }
841
842    #[tokio::test]
843    async fn load_history_with_empty_store_returns_ok() {
844        let provider = mock_provider(vec![]);
845        let channel = MockChannel::new(vec![]);
846        let registry = create_test_registry();
847        let executor = MockToolExecutor::no_tools();
848
849        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
850        let cid = memory.sqlite().create_conversation().await.unwrap();
851
852        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
853            std::sync::Arc::new(memory),
854            cid,
855            50,
856            5,
857            100,
858        );
859
860        let messages_before = agent.msg.messages.len();
861        agent.load_history().await.unwrap();
862        // No messages added — empty history
863        assert_eq!(agent.msg.messages.len(), messages_before);
864    }
865
866    #[tokio::test]
867    async fn load_history_increments_session_count_for_existing_messages() {
868        let provider = mock_provider(vec![]);
869        let channel = MockChannel::new(vec![]);
870        let registry = create_test_registry();
871        let executor = MockToolExecutor::no_tools();
872
873        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
874        let cid = memory.sqlite().create_conversation().await.unwrap();
875
876        // Save two messages — they start with session_count = 0.
877        let id1 = memory
878            .sqlite()
879            .save_message(cid, "user", "hello")
880            .await
881            .unwrap();
882        let id2 = memory
883            .sqlite()
884            .save_message(cid, "assistant", "hi")
885            .await
886            .unwrap();
887
888        let memory_arc = std::sync::Arc::new(memory);
889        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
890            memory_arc.clone(),
891            cid,
892            50,
893            5,
894            100,
895        );
896
897        agent.load_history().await.unwrap();
898
899        // Both episodic messages must have session_count = 1 after restore.
900        let counts: Vec<i64> = zeph_db::query_scalar(
901            "SELECT session_count FROM messages WHERE id IN (?, ?) ORDER BY id",
902        )
903        .bind(id1)
904        .bind(id2)
905        .fetch_all(memory_arc.sqlite().pool())
906        .await
907        .unwrap();
908        assert_eq!(
909            counts,
910            vec![1, 1],
911            "session_count must be 1 after first restore"
912        );
913    }
914
915    #[tokio::test]
916    async fn load_history_does_not_increment_session_count_for_new_conversation() {
917        let provider = mock_provider(vec![]);
918        let channel = MockChannel::new(vec![]);
919        let registry = create_test_registry();
920        let executor = MockToolExecutor::no_tools();
921
922        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
923        let cid = memory.sqlite().create_conversation().await.unwrap();
924
925        // No messages saved — empty conversation.
926        let memory_arc = std::sync::Arc::new(memory);
927        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
928            memory_arc.clone(),
929            cid,
930            50,
931            5,
932            100,
933        );
934
935        agent.load_history().await.unwrap();
936
937        // No rows → no session_count increments → query returns empty.
938        let counts: Vec<i64> =
939            zeph_db::query_scalar("SELECT session_count FROM messages WHERE conversation_id = ?")
940                .bind(cid)
941                .fetch_all(memory_arc.sqlite().pool())
942                .await
943                .unwrap();
944        assert!(counts.is_empty(), "new conversation must have no messages");
945    }
946
947    #[tokio::test]
948    async fn persist_message_without_memory_silently_returns() {
949        // No memory configured — persist_message must not panic
950        let provider = mock_provider(vec![]);
951        let channel = MockChannel::new(vec![]);
952        let registry = create_test_registry();
953        let executor = MockToolExecutor::no_tools();
954        let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
955
956        // Must not panic and must complete
957        agent.persist_message(Role::User, "hello", &[], false).await;
958    }
959
960    #[tokio::test]
961    async fn persist_message_assistant_autosave_false_uses_save_only() {
962        let provider = mock_provider(vec![]);
963        let channel = MockChannel::new(vec![]);
964        let registry = create_test_registry();
965        let executor = MockToolExecutor::no_tools();
966
967        let (tx, rx) = tokio::sync::watch::channel(MetricsSnapshot::default());
968        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
969        let cid = memory.sqlite().create_conversation().await.unwrap();
970
971        let mut agent = Agent::new(provider, channel, registry, None, 5, executor)
972            .with_metrics(tx)
973            .with_memory(std::sync::Arc::new(memory), cid, 50, 5, 100);
974        agent.services.memory.persistence.autosave_assistant = false;
975        agent.services.memory.persistence.autosave_min_length = 20;
976
977        agent
978            .persist_message(Role::Assistant, "short assistant reply", &[], false)
979            .await;
980
981        let history = agent
982            .services
983            .memory
984            .persistence
985            .memory
986            .as_ref()
987            .unwrap()
988            .sqlite()
989            .load_history(cid, 50)
990            .await
991            .unwrap();
992        assert_eq!(history.len(), 1, "message must be saved");
993        assert_eq!(history[0].content, "short assistant reply");
994        // embeddings_generated must remain 0 — save_only path does not embed
995        assert_eq!(rx.borrow().embeddings_generated, 0);
996    }
997
998    #[tokio::test]
999    async fn persist_message_assistant_below_min_length_uses_save_only() {
1000        let provider = mock_provider(vec![]);
1001        let channel = MockChannel::new(vec![]);
1002        let registry = create_test_registry();
1003        let executor = MockToolExecutor::no_tools();
1004
1005        let (tx, rx) = tokio::sync::watch::channel(MetricsSnapshot::default());
1006        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1007        let cid = memory.sqlite().create_conversation().await.unwrap();
1008
1009        // autosave_assistant=true but min_length=1000 — short content falls back to save_only
1010        let mut agent = Agent::new(provider, channel, registry, None, 5, executor)
1011            .with_metrics(tx)
1012            .with_memory(std::sync::Arc::new(memory), cid, 50, 5, 100);
1013        agent.services.memory.persistence.autosave_assistant = true;
1014        agent.services.memory.persistence.autosave_min_length = 1000;
1015
1016        agent
1017            .persist_message(Role::Assistant, "too short", &[], false)
1018            .await;
1019
1020        let history = agent
1021            .services
1022            .memory
1023            .persistence
1024            .memory
1025            .as_ref()
1026            .unwrap()
1027            .sqlite()
1028            .load_history(cid, 50)
1029            .await
1030            .unwrap();
1031        assert_eq!(history.len(), 1, "message must be saved");
1032        assert_eq!(history[0].content, "too short");
1033        assert_eq!(rx.borrow().embeddings_generated, 0);
1034    }
1035
1036    #[tokio::test]
1037    async fn persist_message_assistant_at_min_length_boundary_uses_embed() {
1038        // content.len() == autosave_min_length → should_embed = true (>= boundary).
1039        let provider = mock_provider(vec![]);
1040        let channel = MockChannel::new(vec![]);
1041        let registry = create_test_registry();
1042        let executor = MockToolExecutor::no_tools();
1043
1044        let (tx, rx) = tokio::sync::watch::channel(MetricsSnapshot::default());
1045        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1046        let cid = memory.sqlite().create_conversation().await.unwrap();
1047
1048        let min_length = 10usize;
1049        let mut agent = Agent::new(provider, channel, registry, None, 5, executor)
1050            .with_metrics(tx)
1051            .with_memory(std::sync::Arc::new(memory), cid, 50, 5, 100);
1052        agent.services.memory.persistence.autosave_assistant = true;
1053        agent.services.memory.persistence.autosave_min_length = min_length;
1054
1055        // Exact boundary: len == min_length → embed path.
1056        let content_at_boundary = "A".repeat(min_length);
1057        assert_eq!(content_at_boundary.len(), min_length);
1058        agent
1059            .persist_message(Role::Assistant, &content_at_boundary, &[], false)
1060            .await;
1061
1062        // sqlite_message_count must be incremented regardless of embedding success.
1063        assert_eq!(rx.borrow().sqlite_message_count, 1);
1064    }
1065
1066    #[tokio::test]
1067    async fn persist_message_assistant_one_below_min_length_uses_save_only() {
1068        // content.len() == autosave_min_length - 1 → should_embed = false (below boundary).
1069        let provider = mock_provider(vec![]);
1070        let channel = MockChannel::new(vec![]);
1071        let registry = create_test_registry();
1072        let executor = MockToolExecutor::no_tools();
1073
1074        let (tx, rx) = tokio::sync::watch::channel(MetricsSnapshot::default());
1075        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1076        let cid = memory.sqlite().create_conversation().await.unwrap();
1077
1078        let min_length = 10usize;
1079        let mut agent = Agent::new(provider, channel, registry, None, 5, executor)
1080            .with_metrics(tx)
1081            .with_memory(std::sync::Arc::new(memory), cid, 50, 5, 100);
1082        agent.services.memory.persistence.autosave_assistant = true;
1083        agent.services.memory.persistence.autosave_min_length = min_length;
1084
1085        // One below boundary: len == min_length - 1 → save_only path, no embedding.
1086        let content_below_boundary = "A".repeat(min_length - 1);
1087        assert_eq!(content_below_boundary.len(), min_length - 1);
1088        agent
1089            .persist_message(Role::Assistant, &content_below_boundary, &[], false)
1090            .await;
1091
1092        let history = agent
1093            .services
1094            .memory
1095            .persistence
1096            .memory
1097            .as_ref()
1098            .unwrap()
1099            .sqlite()
1100            .load_history(cid, 50)
1101            .await
1102            .unwrap();
1103        assert_eq!(history.len(), 1, "message must still be saved");
1104        // save_only path does not embed.
1105        assert_eq!(rx.borrow().embeddings_generated, 0);
1106    }
1107
1108    #[tokio::test]
1109    async fn persist_message_increments_unsummarized_count() {
1110        let provider = mock_provider(vec![]);
1111        let channel = MockChannel::new(vec![]);
1112        let registry = create_test_registry();
1113        let executor = MockToolExecutor::no_tools();
1114
1115        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1116        let cid = memory.sqlite().create_conversation().await.unwrap();
1117
1118        // threshold=100 ensures no summarization is triggered
1119        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
1120            std::sync::Arc::new(memory),
1121            cid,
1122            50,
1123            5,
1124            100,
1125        );
1126
1127        assert_eq!(agent.services.memory.persistence.unsummarized_count, 0);
1128
1129        agent.persist_message(Role::User, "first", &[], false).await;
1130        assert_eq!(agent.services.memory.persistence.unsummarized_count, 1);
1131
1132        agent
1133            .persist_message(Role::User, "second", &[], false)
1134            .await;
1135        assert_eq!(agent.services.memory.persistence.unsummarized_count, 2);
1136    }
1137
1138    #[tokio::test]
1139    async fn check_summarization_resets_counter_on_success() {
1140        let provider = mock_provider(vec![]);
1141        let channel = MockChannel::new(vec![]);
1142        let registry = create_test_registry();
1143        let executor = MockToolExecutor::no_tools();
1144
1145        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1146        let cid = memory.sqlite().create_conversation().await.unwrap();
1147
1148        // threshold=1 so the second persist triggers summarization check (count > threshold)
1149        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
1150            std::sync::Arc::new(memory),
1151            cid,
1152            50,
1153            5,
1154            1,
1155        );
1156
1157        agent.persist_message(Role::User, "msg1", &[], false).await;
1158        agent.persist_message(Role::User, "msg2", &[], false).await;
1159
1160        // After summarization attempt (summarize returns Ok(None) since no messages qualify),
1161        // the counter is NOT reset to 0 — only reset on Ok(Some(_)).
1162        // This verifies check_summarization is called and the guard condition works.
1163        // unsummarized_count must be >= 2 before any summarization or 0 if summarization ran.
1164        assert!(agent.services.memory.persistence.unsummarized_count <= 2);
1165    }
1166
1167    #[tokio::test]
1168    async fn unsummarized_count_not_incremented_without_memory() {
1169        let provider = mock_provider(vec![]);
1170        let channel = MockChannel::new(vec![]);
1171        let registry = create_test_registry();
1172        let executor = MockToolExecutor::no_tools();
1173        let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
1174
1175        agent.persist_message(Role::User, "hello", &[], false).await;
1176        // No memory configured — persist_message returns early, counter must stay 0.
1177        assert_eq!(agent.services.memory.persistence.unsummarized_count, 0);
1178    }
1179
1180    // R-CRIT-01: unit tests for enqueue_graph_extraction_task guard conditions.
1181    mod graph_extraction_guards {
1182        use super::*;
1183        use crate::config::GraphConfig;
1184        use zeph_llm::provider::MessageMetadata;
1185        use zeph_memory::graph::GraphStore;
1186
1187        fn enabled_graph_config() -> GraphConfig {
1188            GraphConfig {
1189                enabled: true,
1190                ..GraphConfig::default()
1191            }
1192        }
1193
1194        async fn agent_with_graph(
1195            provider: &AnyProvider,
1196            config: GraphConfig,
1197        ) -> Agent<MockChannel> {
1198            let memory =
1199                test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1200            let cid = memory.sqlite().create_conversation().await.unwrap();
1201            Agent::new(
1202                provider.clone(),
1203                MockChannel::new(vec![]),
1204                create_test_registry(),
1205                None,
1206                5,
1207                MockToolExecutor::no_tools(),
1208            )
1209            .with_memory(std::sync::Arc::new(memory), cid, 50, 5, 100)
1210            .with_graph_config(config)
1211        }
1212
1213        #[tokio::test]
1214        async fn injection_flag_guard_skips_extraction() {
1215            // has_injection_flags=true → extraction must be skipped; no counter in graph_metadata.
1216            let provider = mock_provider(vec![]);
1217            let mut agent = agent_with_graph(&provider, enabled_graph_config()).await;
1218            let pool = agent
1219                .services
1220                .memory
1221                .persistence
1222                .memory
1223                .as_ref()
1224                .unwrap()
1225                .sqlite()
1226                .pool()
1227                .clone();
1228
1229            agent
1230                .enqueue_graph_extraction_task("I use Rust", true, false)
1231                .await;
1232
1233            // Give any accidental spawn time to settle.
1234            tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1235
1236            let store = GraphStore::new(pool);
1237            let count = store.get_metadata("extraction_count").await.unwrap();
1238            assert!(
1239                count.is_none(),
1240                "injection flag must prevent extraction_count from being written"
1241            );
1242        }
1243
1244        #[tokio::test]
1245        async fn disabled_config_guard_skips_extraction() {
1246            // graph.enabled=false → extraction must be skipped.
1247            let provider = mock_provider(vec![]);
1248            let disabled_cfg = GraphConfig {
1249                enabled: false,
1250                ..GraphConfig::default()
1251            };
1252            let mut agent = agent_with_graph(&provider, disabled_cfg).await;
1253            let pool = agent
1254                .services
1255                .memory
1256                .persistence
1257                .memory
1258                .as_ref()
1259                .unwrap()
1260                .sqlite()
1261                .pool()
1262                .clone();
1263
1264            agent
1265                .enqueue_graph_extraction_task("I use Rust", false, false)
1266                .await;
1267
1268            tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1269
1270            let store = GraphStore::new(pool);
1271            let count = store.get_metadata("extraction_count").await.unwrap();
1272            assert!(
1273                count.is_none(),
1274                "disabled graph config must prevent extraction"
1275            );
1276        }
1277
1278        #[tokio::test]
1279        async fn happy_path_fires_extraction() {
1280            // With enabled config and no injection flags, extraction is spawned.
1281            // MockProvider returns None (no entities), but the counter must be incremented.
1282            let provider = mock_provider(vec![]);
1283            let mut agent = agent_with_graph(&provider, enabled_graph_config()).await;
1284            let pool = agent
1285                .services
1286                .memory
1287                .persistence
1288                .memory
1289                .as_ref()
1290                .unwrap()
1291                .sqlite()
1292                .pool()
1293                .clone();
1294
1295            agent
1296                .enqueue_graph_extraction_task("I use Rust for systems programming", false, false)
1297                .await;
1298
1299            // Wait for the spawned task to complete.
1300            tokio::time::sleep(std::time::Duration::from_millis(200)).await;
1301
1302            let store = GraphStore::new(pool);
1303            let count = store.get_metadata("extraction_count").await.unwrap();
1304            assert!(
1305                count.is_some(),
1306                "happy-path extraction must increment extraction_count"
1307            );
1308        }
1309
1310        #[tokio::test]
1311        async fn tool_result_parts_guard_skips_extraction() {
1312            // FIX-1 regression: has_tool_result_parts=true → extraction must be skipped.
1313            // Tool result messages contain raw structured output (TOML, JSON, code) — not
1314            // conversational content. Extracting entities from them produces graph noise.
1315            let provider = mock_provider(vec![]);
1316            let mut agent = agent_with_graph(&provider, enabled_graph_config()).await;
1317            let pool = agent
1318                .services
1319                .memory
1320                .persistence
1321                .memory
1322                .as_ref()
1323                .unwrap()
1324                .sqlite()
1325                .pool()
1326                .clone();
1327
1328            agent
1329                .enqueue_graph_extraction_task(
1330                    "[tool_result: abc123]\nprovider_type = \"claude\"\nallowed_commands = []",
1331                    false,
1332                    true, // has_tool_result_parts
1333                )
1334                .await;
1335
1336            tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1337
1338            let store = GraphStore::new(pool);
1339            let count = store.get_metadata("extraction_count").await.unwrap();
1340            assert!(
1341                count.is_none(),
1342                "tool result message must not trigger graph extraction"
1343            );
1344        }
1345
1346        #[tokio::test]
1347        async fn context_filter_excludes_tool_result_messages() {
1348            // FIX-2: context_messages must not include tool result user messages.
1349            // When enqueue_graph_extraction_task collects context, it filters out
1350            // Role::User messages that contain ToolResult parts — only conversational
1351            // user messages are included as extraction context.
1352            //
1353            // This test verifies the guard fires: a tool result message alone is passed
1354            // (has_tool_result_parts=true) → extraction is skipped entirely, so context
1355            // filtering is not exercised. We verify FIX-2 by ensuring a prior tool result
1356            // message in agent.msg.messages is excluded when a subsequent conversational message
1357            // triggers extraction.
1358            let provider = mock_provider(vec![]);
1359            let mut agent = agent_with_graph(&provider, enabled_graph_config()).await;
1360
1361            // Add a tool result message to the agent's message history — this simulates
1362            // a tool call response that arrived before the current conversational turn.
1363            agent.msg.messages.push(Message {
1364                role: Role::User,
1365                content: "[tool_result: abc]\nprovider_type = \"openai\"".to_owned(),
1366                parts: vec![MessagePart::ToolResult {
1367                    tool_use_id: "abc".to_owned(),
1368                    content: "provider_type = \"openai\"".to_owned(),
1369                    is_error: false,
1370                }],
1371                metadata: MessageMetadata::default(),
1372            });
1373
1374            let pool = agent
1375                .services
1376                .memory
1377                .persistence
1378                .memory
1379                .as_ref()
1380                .unwrap()
1381                .sqlite()
1382                .pool()
1383                .clone();
1384
1385            // Trigger extraction for a conversational message (not a tool result).
1386            agent
1387                .enqueue_graph_extraction_task(
1388                    "I prefer Rust for systems programming",
1389                    false,
1390                    false,
1391                )
1392                .await;
1393
1394            tokio::time::sleep(std::time::Duration::from_millis(200)).await;
1395
1396            // Extraction must have fired (conversational message, no injection flags).
1397            let store = GraphStore::new(pool);
1398            let count = store.get_metadata("extraction_count").await.unwrap();
1399            assert!(
1400                count.is_some(),
1401                "conversational message must trigger extraction even with prior tool result in history"
1402            );
1403        }
1404    }
1405
1406    // R-PERS-01: unit tests for enqueue_persona_extraction_task guard conditions.
1407    mod persona_extraction_guards {
1408        use super::*;
1409        use zeph_config::PersonaConfig;
1410        use zeph_llm::provider::MessageMetadata;
1411
1412        fn enabled_persona_config() -> PersonaConfig {
1413            PersonaConfig {
1414                enabled: true,
1415                min_messages: 1,
1416                ..PersonaConfig::default()
1417            }
1418        }
1419
1420        async fn agent_with_persona(
1421            provider: &AnyProvider,
1422            config: PersonaConfig,
1423        ) -> Agent<MockChannel> {
1424            let memory =
1425                test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1426            let cid = memory.sqlite().create_conversation().await.unwrap();
1427            let mut agent = Agent::new(
1428                provider.clone(),
1429                MockChannel::new(vec![]),
1430                create_test_registry(),
1431                None,
1432                5,
1433                MockToolExecutor::no_tools(),
1434            )
1435            .with_memory(std::sync::Arc::new(memory), cid, 50, 5, 100);
1436            agent.services.memory.extraction.persona_config = config;
1437            agent
1438        }
1439
1440        #[tokio::test]
1441        async fn disabled_config_skips_spawn() {
1442            // persona.enabled=false → nothing is spawned; persona_memory stays empty.
1443            let provider = mock_provider(vec![]);
1444            let mut agent = agent_with_persona(
1445                &provider,
1446                PersonaConfig {
1447                    enabled: false,
1448                    ..PersonaConfig::default()
1449                },
1450            )
1451            .await;
1452
1453            // Inject a user message so message count is above threshold.
1454            agent.msg.messages.push(zeph_llm::provider::Message {
1455                role: Role::User,
1456                content: "I prefer Rust for systems programming".to_owned(),
1457                parts: vec![],
1458                metadata: MessageMetadata::default(),
1459            });
1460
1461            agent.enqueue_persona_extraction_task();
1462
1463            let store = agent
1464                .services
1465                .memory
1466                .persistence
1467                .memory
1468                .as_ref()
1469                .unwrap()
1470                .sqlite()
1471                .clone();
1472            let count = store.count_persona_facts().await.unwrap();
1473            assert_eq!(count, 0, "disabled persona config must not write any facts");
1474        }
1475
1476        #[tokio::test]
1477        async fn below_min_messages_skips_spawn() {
1478            // min_messages=3 but only 2 user messages → should skip.
1479            let provider = mock_provider(vec![]);
1480            let mut agent = agent_with_persona(
1481                &provider,
1482                PersonaConfig {
1483                    enabled: true,
1484                    min_messages: 3,
1485                    ..PersonaConfig::default()
1486                },
1487            )
1488            .await;
1489
1490            for text in ["I use Rust", "I prefer async code"] {
1491                agent.msg.messages.push(zeph_llm::provider::Message {
1492                    role: Role::User,
1493                    content: text.to_owned(),
1494                    parts: vec![],
1495                    metadata: MessageMetadata::default(),
1496                });
1497            }
1498
1499            agent.enqueue_persona_extraction_task();
1500
1501            let store = agent
1502                .services
1503                .memory
1504                .persistence
1505                .memory
1506                .as_ref()
1507                .unwrap()
1508                .sqlite()
1509                .clone();
1510            let count = store.count_persona_facts().await.unwrap();
1511            assert_eq!(
1512                count, 0,
1513                "below min_messages threshold must not trigger extraction"
1514            );
1515        }
1516
1517        #[tokio::test]
1518        async fn no_memory_skips_spawn() {
1519            // memory=None → must exit early without panic.
1520            let provider = mock_provider(vec![]);
1521            let channel = MockChannel::new(vec![]);
1522            let registry = create_test_registry();
1523            let executor = MockToolExecutor::no_tools();
1524            let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
1525            agent.services.memory.extraction.persona_config = enabled_persona_config();
1526            agent.msg.messages.push(zeph_llm::provider::Message {
1527                role: Role::User,
1528                content: "I like Rust".to_owned(),
1529                parts: vec![],
1530                metadata: MessageMetadata::default(),
1531            });
1532
1533            // Must not panic even without memory.
1534            agent.enqueue_persona_extraction_task();
1535        }
1536
1537        #[tokio::test]
1538        async fn enabled_enough_messages_spawns_extraction() {
1539            // enabled=true, min_messages=1, self-referential message → extraction runs eagerly
1540            // (not fire-and-forget) and chat() is called on the provider, verified via MockProvider.
1541            use zeph_llm::mock::MockProvider;
1542            let (mock, recorded) = MockProvider::default().with_recording();
1543            let provider = AnyProvider::Mock(mock);
1544            let mut agent = agent_with_persona(&provider, enabled_persona_config()).await;
1545
1546            agent.msg.messages.push(zeph_llm::provider::Message {
1547                role: Role::User,
1548                content: "I prefer Rust for systems programming".to_owned(),
1549                parts: vec![],
1550                metadata: MessageMetadata::default(),
1551            });
1552
1553            agent.enqueue_persona_extraction_task();
1554
1555            // Persona extraction runs via BackgroundSupervisor. Wait for tasks to complete.
1556            agent.runtime.lifecycle.supervisor.join_all_for_test().await;
1557
1558            let calls = recorded.lock().unwrap();
1559            assert!(
1560                !calls.is_empty(),
1561                "happy-path: provider.chat() must be called when extraction completes"
1562            );
1563        }
1564
1565        #[tokio::test]
1566        async fn messages_capped_at_eight() {
1567            // More than 8 user messages → only 8 are passed to extraction.
1568            // Each message contains "I" so self-referential gate passes.
1569            use zeph_llm::mock::MockProvider;
1570            let (mock, recorded) = MockProvider::default().with_recording();
1571            let provider = AnyProvider::Mock(mock);
1572            let mut agent = agent_with_persona(&provider, enabled_persona_config()).await;
1573
1574            for i in 0..12u32 {
1575                agent.msg.messages.push(zeph_llm::provider::Message {
1576                    role: Role::User,
1577                    content: format!("I like message {i}"),
1578                    parts: vec![],
1579                    metadata: MessageMetadata::default(),
1580                });
1581            }
1582
1583            agent.enqueue_persona_extraction_task();
1584
1585            // Allow the background task to complete before asserting.
1586            agent.runtime.lifecycle.supervisor.join_all_for_test().await;
1587
1588            // Verify extraction ran (provider was called).
1589            let calls = recorded.lock().unwrap();
1590            assert!(
1591                !calls.is_empty(),
1592                "extraction must run when enough messages present"
1593            );
1594            // Verify the prompt sent to the provider does not contain messages beyond the 8th.
1595            let prompt = &calls[0];
1596            let user_text = prompt
1597                .iter()
1598                .filter(|m| m.role == Role::User)
1599                .map(|m| m.content.as_str())
1600                .collect::<Vec<_>>()
1601                .join(" ");
1602            // Messages 8..11 ("I like message 8".."I like message 11") must not appear.
1603            assert!(
1604                !user_text.contains("I like message 8"),
1605                "message index 8 must be excluded from extraction input"
1606            );
1607        }
1608
1609        #[test]
1610        fn long_message_truncated_at_char_boundary() {
1611            // Directly verify the per-message truncation logic applied in
1612            // enqueue_persona_extraction_task: a content > 2048 bytes must be capped
1613            // to exactly floor_char_boundary(2048).
1614            let long_content = "x".repeat(3000);
1615            let truncated = if long_content.len() > 2048 {
1616                long_content[..long_content.floor_char_boundary(2048)].to_owned()
1617            } else {
1618                long_content.clone()
1619            };
1620            assert_eq!(
1621                truncated.len(),
1622                2048,
1623                "ASCII content must be truncated to exactly 2048 bytes"
1624            );
1625
1626            // Multi-byte boundary: build a string whose char boundary falls before 2048.
1627            let multi = "é".repeat(1500); // each 'é' is 2 bytes → 3000 bytes total
1628            let truncated_multi = if multi.len() > 2048 {
1629                multi[..multi.floor_char_boundary(2048)].to_owned()
1630            } else {
1631                multi.clone()
1632            };
1633            assert!(
1634                truncated_multi.len() <= 2048,
1635                "multi-byte content must not exceed 2048 bytes"
1636            );
1637            assert!(truncated_multi.is_char_boundary(truncated_multi.len()));
1638        }
1639    }
1640
1641    #[tokio::test]
1642    async fn persist_message_user_always_embeds_regardless_of_autosave_flag() {
1643        let provider = mock_provider(vec![]);
1644        let channel = MockChannel::new(vec![]);
1645        let registry = create_test_registry();
1646        let executor = MockToolExecutor::no_tools();
1647
1648        let (tx, rx) = tokio::sync::watch::channel(MetricsSnapshot::default());
1649        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1650        let cid = memory.sqlite().create_conversation().await.unwrap();
1651
1652        // autosave_assistant=false — but User role always takes embedding path
1653        let mut agent = Agent::new(provider, channel, registry, None, 5, executor)
1654            .with_metrics(tx)
1655            .with_memory(std::sync::Arc::new(memory), cid, 50, 5, 100);
1656        agent.services.memory.persistence.autosave_assistant = false;
1657        agent.services.memory.persistence.autosave_min_length = 20;
1658
1659        let long_user_msg = "A".repeat(100);
1660        agent
1661            .persist_message(Role::User, &long_user_msg, &[], false)
1662            .await;
1663
1664        let history = agent
1665            .services
1666            .memory
1667            .persistence
1668            .memory
1669            .as_ref()
1670            .unwrap()
1671            .sqlite()
1672            .load_history(cid, 50)
1673            .await
1674            .unwrap();
1675        assert_eq!(history.len(), 1, "user message must be saved");
1676        // User messages go through remember_with_parts (embedding path).
1677        // sqlite_message_count must increment regardless of Qdrant availability.
1678        assert_eq!(rx.borrow().sqlite_message_count, 1);
1679    }
1680
1681    // Round-trip tests: verify that persist_message saves the correct parts and they
1682    // are restored correctly by load_history.
1683
1684    #[tokio::test]
1685    async fn persist_message_saves_correct_tool_use_parts() {
1686        use zeph_llm::provider::MessagePart;
1687
1688        let provider = mock_provider(vec![]);
1689        let channel = MockChannel::new(vec![]);
1690        let registry = create_test_registry();
1691        let executor = MockToolExecutor::no_tools();
1692
1693        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1694        let cid = memory.sqlite().create_conversation().await.unwrap();
1695
1696        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
1697            std::sync::Arc::new(memory),
1698            cid,
1699            50,
1700            5,
1701            100,
1702        );
1703
1704        let parts = vec![MessagePart::ToolUse {
1705            id: "call_abc123".to_string(),
1706            name: "read_file".to_string(),
1707            input: serde_json::json!({"path": "/tmp/test.txt"}),
1708        }];
1709        let content = "[tool_use: read_file(call_abc123)]";
1710
1711        agent
1712            .persist_message(Role::Assistant, content, &parts, false)
1713            .await;
1714
1715        let history = agent
1716            .services
1717            .memory
1718            .persistence
1719            .memory
1720            .as_ref()
1721            .unwrap()
1722            .sqlite()
1723            .load_history(cid, 50)
1724            .await
1725            .unwrap();
1726
1727        assert_eq!(history.len(), 1);
1728        assert_eq!(history[0].role, Role::Assistant);
1729        assert_eq!(history[0].content, content);
1730        assert_eq!(history[0].parts.len(), 1);
1731        match &history[0].parts[0] {
1732            MessagePart::ToolUse { id, name, .. } => {
1733                assert_eq!(id, "call_abc123");
1734                assert_eq!(name, "read_file");
1735            }
1736            other => panic!("expected ToolUse part, got {other:?}"),
1737        }
1738        // Regression guard: assistant message must NOT have ToolResult parts
1739        assert!(
1740            !history[0]
1741                .parts
1742                .iter()
1743                .any(|p| matches!(p, MessagePart::ToolResult { .. })),
1744            "assistant message must not contain ToolResult parts"
1745        );
1746    }
1747
1748    #[tokio::test]
1749    async fn persist_message_saves_correct_tool_result_parts() {
1750        use zeph_llm::provider::MessagePart;
1751
1752        let provider = mock_provider(vec![]);
1753        let channel = MockChannel::new(vec![]);
1754        let registry = create_test_registry();
1755        let executor = MockToolExecutor::no_tools();
1756
1757        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1758        let cid = memory.sqlite().create_conversation().await.unwrap();
1759
1760        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
1761            std::sync::Arc::new(memory),
1762            cid,
1763            50,
1764            5,
1765            100,
1766        );
1767
1768        let parts = vec![MessagePart::ToolResult {
1769            tool_use_id: "call_abc123".to_string(),
1770            content: "file contents here".to_string(),
1771            is_error: false,
1772        }];
1773        let content = "[tool_result: call_abc123]\nfile contents here";
1774
1775        agent
1776            .persist_message(Role::User, content, &parts, false)
1777            .await;
1778
1779        let history = agent
1780            .services
1781            .memory
1782            .persistence
1783            .memory
1784            .as_ref()
1785            .unwrap()
1786            .sqlite()
1787            .load_history(cid, 50)
1788            .await
1789            .unwrap();
1790
1791        assert_eq!(history.len(), 1);
1792        assert_eq!(history[0].role, Role::User);
1793        assert_eq!(history[0].content, content);
1794        assert_eq!(history[0].parts.len(), 1);
1795        match &history[0].parts[0] {
1796            MessagePart::ToolResult {
1797                tool_use_id,
1798                content: result_content,
1799                is_error,
1800            } => {
1801                assert_eq!(tool_use_id, "call_abc123");
1802                assert_eq!(result_content, "file contents here");
1803                assert!(!is_error);
1804            }
1805            other => panic!("expected ToolResult part, got {other:?}"),
1806        }
1807        // Regression guard: user message with ToolResult must NOT have ToolUse parts
1808        assert!(
1809            !history[0]
1810                .parts
1811                .iter()
1812                .any(|p| matches!(p, MessagePart::ToolUse { .. })),
1813            "user ToolResult message must not contain ToolUse parts"
1814        );
1815    }
1816
1817    #[tokio::test]
1818    async fn persist_message_roundtrip_preserves_role_part_alignment() {
1819        use zeph_llm::provider::MessagePart;
1820
1821        let provider = mock_provider(vec![]);
1822        let channel = MockChannel::new(vec![]);
1823        let registry = create_test_registry();
1824        let executor = MockToolExecutor::no_tools();
1825
1826        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1827        let cid = memory.sqlite().create_conversation().await.unwrap();
1828
1829        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
1830            std::sync::Arc::new(memory),
1831            cid,
1832            50,
1833            5,
1834            100,
1835        );
1836
1837        // Persist assistant message with ToolUse parts
1838        let assistant_parts = vec![MessagePart::ToolUse {
1839            id: "id_1".to_string(),
1840            name: "list_dir".to_string(),
1841            input: serde_json::json!({"path": "/tmp"}),
1842        }];
1843        agent
1844            .persist_message(
1845                Role::Assistant,
1846                "[tool_use: list_dir(id_1)]",
1847                &assistant_parts,
1848                false,
1849            )
1850            .await;
1851
1852        // Persist user message with ToolResult parts
1853        let user_parts = vec![MessagePart::ToolResult {
1854            tool_use_id: "id_1".to_string(),
1855            content: "file1.txt\nfile2.txt".to_string(),
1856            is_error: false,
1857        }];
1858        agent
1859            .persist_message(
1860                Role::User,
1861                "[tool_result: id_1]\nfile1.txt\nfile2.txt",
1862                &user_parts,
1863                false,
1864            )
1865            .await;
1866
1867        let history = agent
1868            .services
1869            .memory
1870            .persistence
1871            .memory
1872            .as_ref()
1873            .unwrap()
1874            .sqlite()
1875            .load_history(cid, 50)
1876            .await
1877            .unwrap();
1878
1879        assert_eq!(history.len(), 2);
1880
1881        // First message: assistant + ToolUse
1882        assert_eq!(history[0].role, Role::Assistant);
1883        assert_eq!(history[0].content, "[tool_use: list_dir(id_1)]");
1884        assert!(
1885            matches!(&history[0].parts[0], MessagePart::ToolUse { id, .. } if id == "id_1"),
1886            "first message must be assistant ToolUse"
1887        );
1888
1889        // Second message: user + ToolResult
1890        assert_eq!(history[1].role, Role::User);
1891        assert_eq!(
1892            history[1].content,
1893            "[tool_result: id_1]\nfile1.txt\nfile2.txt"
1894        );
1895        assert!(
1896            matches!(&history[1].parts[0], MessagePart::ToolResult { tool_use_id, .. } if tool_use_id == "id_1"),
1897            "second message must be user ToolResult"
1898        );
1899
1900        // Cross-role regression guard: no swapped parts
1901        assert!(
1902            !history[0]
1903                .parts
1904                .iter()
1905                .any(|p| matches!(p, MessagePart::ToolResult { .. })),
1906            "assistant message must not have ToolResult parts"
1907        );
1908        assert!(
1909            !history[1]
1910                .parts
1911                .iter()
1912                .any(|p| matches!(p, MessagePart::ToolUse { .. })),
1913            "user message must not have ToolUse parts"
1914        );
1915    }
1916
1917    #[tokio::test]
1918    async fn persist_message_saves_correct_tool_output_parts() {
1919        use zeph_llm::provider::MessagePart;
1920
1921        let provider = mock_provider(vec![]);
1922        let channel = MockChannel::new(vec![]);
1923        let registry = create_test_registry();
1924        let executor = MockToolExecutor::no_tools();
1925
1926        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1927        let cid = memory.sqlite().create_conversation().await.unwrap();
1928
1929        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
1930            std::sync::Arc::new(memory),
1931            cid,
1932            50,
1933            5,
1934            100,
1935        );
1936
1937        let parts = vec![MessagePart::ToolOutput {
1938            tool_name: "shell".into(),
1939            body: "hello from shell".to_string(),
1940            compacted_at: None,
1941        }];
1942        let content = "[tool: shell]\nhello from shell";
1943
1944        agent
1945            .persist_message(Role::User, content, &parts, false)
1946            .await;
1947
1948        let history = agent
1949            .services
1950            .memory
1951            .persistence
1952            .memory
1953            .as_ref()
1954            .unwrap()
1955            .sqlite()
1956            .load_history(cid, 50)
1957            .await
1958            .unwrap();
1959
1960        assert_eq!(history.len(), 1);
1961        assert_eq!(history[0].role, Role::User);
1962        assert_eq!(history[0].content, content);
1963        assert_eq!(history[0].parts.len(), 1);
1964        match &history[0].parts[0] {
1965            MessagePart::ToolOutput {
1966                tool_name,
1967                body,
1968                compacted_at,
1969            } => {
1970                assert_eq!(tool_name, "shell");
1971                assert_eq!(body, "hello from shell");
1972                assert!(compacted_at.is_none());
1973            }
1974            other => panic!("expected ToolOutput part, got {other:?}"),
1975        }
1976    }
1977
1978    // --- sanitize_tool_pairs unit tests ---
1979
1980    #[tokio::test]
1981    async fn load_history_removes_trailing_orphan_tool_use() {
1982        use zeph_llm::provider::MessagePart;
1983
1984        let provider = mock_provider(vec![]);
1985        let channel = MockChannel::new(vec![]);
1986        let registry = create_test_registry();
1987        let executor = MockToolExecutor::no_tools();
1988
1989        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
1990        let cid = memory.sqlite().create_conversation().await.unwrap();
1991        let sqlite = memory.sqlite();
1992
1993        // user message (normal)
1994        sqlite
1995            .save_message(cid, "user", "do something with a tool")
1996            .await
1997            .unwrap();
1998
1999        // assistant message with ToolUse parts — no following tool_result (orphan)
2000        let parts = serde_json::to_string(&[MessagePart::ToolUse {
2001            id: "call_orphan".to_string(),
2002            name: "shell".to_string(),
2003            input: serde_json::json!({"command": "ls"}),
2004        }])
2005        .unwrap();
2006        sqlite
2007            .save_message_with_parts(cid, "assistant", "[tool_use: shell(call_orphan)]", &parts)
2008            .await
2009            .unwrap();
2010
2011        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2012            std::sync::Arc::new(memory),
2013            cid,
2014            50,
2015            5,
2016            100,
2017        );
2018
2019        let messages_before = agent.msg.messages.len();
2020        agent.load_history().await.unwrap();
2021
2022        // Only the user message should be loaded; orphaned assistant tool_use removed.
2023        assert_eq!(
2024            agent.msg.messages.len(),
2025            messages_before + 1,
2026            "orphaned trailing tool_use must be removed"
2027        );
2028        assert_eq!(agent.msg.messages.last().unwrap().role, Role::User);
2029    }
2030
2031    #[tokio::test]
2032    async fn load_history_removes_leading_orphan_tool_result() {
2033        use zeph_llm::provider::MessagePart;
2034
2035        let provider = mock_provider(vec![]);
2036        let channel = MockChannel::new(vec![]);
2037        let registry = create_test_registry();
2038        let executor = MockToolExecutor::no_tools();
2039
2040        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2041        let cid = memory.sqlite().create_conversation().await.unwrap();
2042        let sqlite = memory.sqlite();
2043
2044        // Leading orphan: user message with ToolResult but no preceding tool_use
2045        let result_parts = serde_json::to_string(&[MessagePart::ToolResult {
2046            tool_use_id: "call_missing".to_string(),
2047            content: "result data".to_string(),
2048            is_error: false,
2049        }])
2050        .unwrap();
2051        sqlite
2052            .save_message_with_parts(
2053                cid,
2054                "user",
2055                "[tool_result: call_missing]\nresult data",
2056                &result_parts,
2057            )
2058            .await
2059            .unwrap();
2060
2061        // A valid assistant reply after the orphan
2062        sqlite
2063            .save_message(cid, "assistant", "here is my response")
2064            .await
2065            .unwrap();
2066
2067        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2068            std::sync::Arc::new(memory),
2069            cid,
2070            50,
2071            5,
2072            100,
2073        );
2074
2075        let messages_before = agent.msg.messages.len();
2076        agent.load_history().await.unwrap();
2077
2078        // Orphaned leading tool_result removed; only assistant message kept.
2079        assert_eq!(
2080            agent.msg.messages.len(),
2081            messages_before + 1,
2082            "orphaned leading tool_result must be removed"
2083        );
2084        assert_eq!(agent.msg.messages.last().unwrap().role, Role::Assistant);
2085    }
2086
2087    #[tokio::test]
2088    async fn load_history_preserves_complete_tool_pairs() {
2089        use zeph_llm::provider::MessagePart;
2090
2091        let provider = mock_provider(vec![]);
2092        let channel = MockChannel::new(vec![]);
2093        let registry = create_test_registry();
2094        let executor = MockToolExecutor::no_tools();
2095
2096        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2097        let cid = memory.sqlite().create_conversation().await.unwrap();
2098        let sqlite = memory.sqlite();
2099
2100        // Complete tool_use / tool_result pair
2101        let use_parts = serde_json::to_string(&[MessagePart::ToolUse {
2102            id: "call_ok".to_string(),
2103            name: "shell".to_string(),
2104            input: serde_json::json!({"command": "pwd"}),
2105        }])
2106        .unwrap();
2107        sqlite
2108            .save_message_with_parts(cid, "assistant", "[tool_use: shell(call_ok)]", &use_parts)
2109            .await
2110            .unwrap();
2111
2112        let result_parts = serde_json::to_string(&[MessagePart::ToolResult {
2113            tool_use_id: "call_ok".to_string(),
2114            content: "/home/user".to_string(),
2115            is_error: false,
2116        }])
2117        .unwrap();
2118        sqlite
2119            .save_message_with_parts(
2120                cid,
2121                "user",
2122                "[tool_result: call_ok]\n/home/user",
2123                &result_parts,
2124            )
2125            .await
2126            .unwrap();
2127
2128        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2129            std::sync::Arc::new(memory),
2130            cid,
2131            50,
2132            5,
2133            100,
2134        );
2135
2136        let messages_before = agent.msg.messages.len();
2137        agent.load_history().await.unwrap();
2138
2139        // Both messages must be preserved.
2140        assert_eq!(
2141            agent.msg.messages.len(),
2142            messages_before + 2,
2143            "complete tool_use/tool_result pair must be preserved"
2144        );
2145        assert_eq!(agent.msg.messages[messages_before].role, Role::Assistant);
2146        assert_eq!(agent.msg.messages[messages_before + 1].role, Role::User);
2147    }
2148
2149    #[tokio::test]
2150    async fn load_history_handles_multiple_trailing_orphans() {
2151        use zeph_llm::provider::MessagePart;
2152
2153        let provider = mock_provider(vec![]);
2154        let channel = MockChannel::new(vec![]);
2155        let registry = create_test_registry();
2156        let executor = MockToolExecutor::no_tools();
2157
2158        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2159        let cid = memory.sqlite().create_conversation().await.unwrap();
2160        let sqlite = memory.sqlite();
2161
2162        // Normal user message
2163        sqlite.save_message(cid, "user", "start").await.unwrap();
2164
2165        // First orphaned tool_use
2166        let parts1 = serde_json::to_string(&[MessagePart::ToolUse {
2167            id: "call_1".to_string(),
2168            name: "shell".to_string(),
2169            input: serde_json::json!({}),
2170        }])
2171        .unwrap();
2172        sqlite
2173            .save_message_with_parts(cid, "assistant", "[tool_use: shell(call_1)]", &parts1)
2174            .await
2175            .unwrap();
2176
2177        // Second orphaned tool_use (consecutive, no tool_result between them)
2178        let parts2 = serde_json::to_string(&[MessagePart::ToolUse {
2179            id: "call_2".to_string(),
2180            name: "read_file".to_string(),
2181            input: serde_json::json!({}),
2182        }])
2183        .unwrap();
2184        sqlite
2185            .save_message_with_parts(cid, "assistant", "[tool_use: read_file(call_2)]", &parts2)
2186            .await
2187            .unwrap();
2188
2189        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2190            std::sync::Arc::new(memory),
2191            cid,
2192            50,
2193            5,
2194            100,
2195        );
2196
2197        let messages_before = agent.msg.messages.len();
2198        agent.load_history().await.unwrap();
2199
2200        // Both orphaned tool_use messages removed; only the user message kept.
2201        assert_eq!(
2202            agent.msg.messages.len(),
2203            messages_before + 1,
2204            "all trailing orphaned tool_use messages must be removed"
2205        );
2206        assert_eq!(agent.msg.messages.last().unwrap().role, Role::User);
2207    }
2208
2209    #[tokio::test]
2210    async fn load_history_no_tool_messages_unchanged() {
2211        let provider = mock_provider(vec![]);
2212        let channel = MockChannel::new(vec![]);
2213        let registry = create_test_registry();
2214        let executor = MockToolExecutor::no_tools();
2215
2216        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2217        let cid = memory.sqlite().create_conversation().await.unwrap();
2218        let sqlite = memory.sqlite();
2219
2220        sqlite.save_message(cid, "user", "hello").await.unwrap();
2221        sqlite
2222            .save_message(cid, "assistant", "hi there")
2223            .await
2224            .unwrap();
2225        sqlite
2226            .save_message(cid, "user", "how are you?")
2227            .await
2228            .unwrap();
2229
2230        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2231            std::sync::Arc::new(memory),
2232            cid,
2233            50,
2234            5,
2235            100,
2236        );
2237
2238        let messages_before = agent.msg.messages.len();
2239        agent.load_history().await.unwrap();
2240
2241        // All three plain messages must be preserved.
2242        assert_eq!(
2243            agent.msg.messages.len(),
2244            messages_before + 3,
2245            "plain messages without tool parts must pass through unchanged"
2246        );
2247    }
2248
2249    #[tokio::test]
2250    async fn load_history_removes_both_leading_and_trailing_orphans() {
2251        use zeph_llm::provider::MessagePart;
2252
2253        let provider = mock_provider(vec![]);
2254        let channel = MockChannel::new(vec![]);
2255        let registry = create_test_registry();
2256        let executor = MockToolExecutor::no_tools();
2257
2258        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2259        let cid = memory.sqlite().create_conversation().await.unwrap();
2260        let sqlite = memory.sqlite();
2261
2262        // Leading orphan: user message with ToolResult, no preceding tool_use
2263        let result_parts = serde_json::to_string(&[MessagePart::ToolResult {
2264            tool_use_id: "call_leading".to_string(),
2265            content: "orphaned result".to_string(),
2266            is_error: false,
2267        }])
2268        .unwrap();
2269        sqlite
2270            .save_message_with_parts(
2271                cid,
2272                "user",
2273                "[tool_result: call_leading]\norphaned result",
2274                &result_parts,
2275            )
2276            .await
2277            .unwrap();
2278
2279        // Valid middle messages
2280        sqlite
2281            .save_message(cid, "user", "what is 2+2?")
2282            .await
2283            .unwrap();
2284        sqlite.save_message(cid, "assistant", "4").await.unwrap();
2285
2286        // Trailing orphan: assistant message with ToolUse, no following tool_result
2287        let use_parts = serde_json::to_string(&[MessagePart::ToolUse {
2288            id: "call_trailing".to_string(),
2289            name: "shell".to_string(),
2290            input: serde_json::json!({"command": "date"}),
2291        }])
2292        .unwrap();
2293        sqlite
2294            .save_message_with_parts(
2295                cid,
2296                "assistant",
2297                "[tool_use: shell(call_trailing)]",
2298                &use_parts,
2299            )
2300            .await
2301            .unwrap();
2302
2303        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2304            std::sync::Arc::new(memory),
2305            cid,
2306            50,
2307            5,
2308            100,
2309        );
2310
2311        let messages_before = agent.msg.messages.len();
2312        agent.load_history().await.unwrap();
2313
2314        // Both orphans removed; only the 2 valid middle messages kept.
2315        assert_eq!(
2316            agent.msg.messages.len(),
2317            messages_before + 2,
2318            "both leading and trailing orphans must be removed"
2319        );
2320        assert_eq!(agent.msg.messages[messages_before].role, Role::User);
2321        assert_eq!(agent.msg.messages[messages_before].content, "what is 2+2?");
2322        assert_eq!(
2323            agent.msg.messages[messages_before + 1].role,
2324            Role::Assistant
2325        );
2326        assert_eq!(agent.msg.messages[messages_before + 1].content, "4");
2327    }
2328
2329    /// RC1 regression: mid-history assistant[`ToolUse`] without a following user[`ToolResult`]
2330    /// must have its `ToolUse` parts stripped (text preserved). The message count stays the same
2331    /// because the assistant message has a text content fallback; only `ToolUse` parts are
2332    /// removed.
2333    #[tokio::test]
2334    async fn sanitize_tool_pairs_strips_mid_history_orphan_tool_use() {
2335        use zeph_llm::provider::MessagePart;
2336
2337        let provider = mock_provider(vec![]);
2338        let channel = MockChannel::new(vec![]);
2339        let registry = create_test_registry();
2340        let executor = MockToolExecutor::no_tools();
2341
2342        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2343        let cid = memory.sqlite().create_conversation().await.unwrap();
2344        let sqlite = memory.sqlite();
2345
2346        // Normal first exchange.
2347        sqlite
2348            .save_message(cid, "user", "first question")
2349            .await
2350            .unwrap();
2351        sqlite
2352            .save_message(cid, "assistant", "first answer")
2353            .await
2354            .unwrap();
2355
2356        // Mid-history orphan: assistant with ToolUse but NO following ToolResult user message.
2357        // This models the compaction-split scenario (RC2) where replace_conversation hid the
2358        // user[ToolResult] but left the assistant[ToolUse] visible.
2359        let use_parts = serde_json::to_string(&[
2360            MessagePart::ToolUse {
2361                id: "call_mid_1".to_string(),
2362                name: "shell".to_string(),
2363                input: serde_json::json!({"command": "ls"}),
2364            },
2365            MessagePart::Text {
2366                text: "Let me check the files.".to_string(),
2367            },
2368        ])
2369        .unwrap();
2370        sqlite
2371            .save_message_with_parts(cid, "assistant", "Let me check the files.", &use_parts)
2372            .await
2373            .unwrap();
2374
2375        // Another normal exchange after the orphan.
2376        sqlite
2377            .save_message(cid, "user", "second question")
2378            .await
2379            .unwrap();
2380        sqlite
2381            .save_message(cid, "assistant", "second answer")
2382            .await
2383            .unwrap();
2384
2385        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2386            std::sync::Arc::new(memory),
2387            cid,
2388            50,
2389            5,
2390            100,
2391        );
2392
2393        let messages_before = agent.msg.messages.len();
2394        agent.load_history().await.unwrap();
2395
2396        // All 5 messages remain (orphan message kept because it has text), but the orphaned
2397        // message must have its ToolUse parts stripped.
2398        assert_eq!(
2399            agent.msg.messages.len(),
2400            messages_before + 5,
2401            "message count must be 5 (orphan message kept — has text content)"
2402        );
2403
2404        // The orphaned assistant message (index 2 in the loaded slice) must have no ToolUse parts.
2405        let orphan = &agent.msg.messages[messages_before + 2];
2406        assert_eq!(orphan.role, Role::Assistant);
2407        assert!(
2408            !orphan
2409                .parts
2410                .iter()
2411                .any(|p| matches!(p, MessagePart::ToolUse { .. })),
2412            "orphaned ToolUse parts must be stripped from mid-history message"
2413        );
2414        // Text part must be preserved.
2415        assert!(
2416            orphan.parts.iter().any(
2417                |p| matches!(p, MessagePart::Text { text } if text == "Let me check the files.")
2418            ),
2419            "text content of orphaned assistant message must be preserved"
2420        );
2421    }
2422
2423    /// RC3 regression: a user message with empty `content` but non-empty `parts` (`ToolResult`)
2424    /// must NOT be skipped by `load_history`. Previously the empty-content check dropped these
2425    /// messages before `sanitize_tool_pairs` ran, leaving the preceding assistant `ToolUse`
2426    /// orphaned.
2427    #[tokio::test]
2428    async fn load_history_keeps_tool_only_user_message() {
2429        use zeph_llm::provider::MessagePart;
2430
2431        let provider = mock_provider(vec![]);
2432        let channel = MockChannel::new(vec![]);
2433        let registry = create_test_registry();
2434        let executor = MockToolExecutor::no_tools();
2435
2436        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2437        let cid = memory.sqlite().create_conversation().await.unwrap();
2438        let sqlite = memory.sqlite();
2439
2440        // Assistant sends a ToolUse.
2441        let use_parts = serde_json::to_string(&[MessagePart::ToolUse {
2442            id: "call_rc3".to_string(),
2443            name: "memory_save".to_string(),
2444            input: serde_json::json!({"content": "something"}),
2445        }])
2446        .unwrap();
2447        sqlite
2448            .save_message_with_parts(cid, "assistant", "[tool_use: memory_save]", &use_parts)
2449            .await
2450            .unwrap();
2451
2452        // User message has empty text content but carries ToolResult in parts — native tool pattern.
2453        let result_parts = serde_json::to_string(&[MessagePart::ToolResult {
2454            tool_use_id: "call_rc3".to_string(),
2455            content: "saved".to_string(),
2456            is_error: false,
2457        }])
2458        .unwrap();
2459        sqlite
2460            .save_message_with_parts(cid, "user", "", &result_parts)
2461            .await
2462            .unwrap();
2463
2464        sqlite.save_message(cid, "assistant", "done").await.unwrap();
2465
2466        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2467            std::sync::Arc::new(memory),
2468            cid,
2469            50,
2470            5,
2471            100,
2472        );
2473
2474        let messages_before = agent.msg.messages.len();
2475        agent.load_history().await.unwrap();
2476
2477        // All 3 messages must be loaded — the empty-content ToolResult user message must NOT be
2478        // dropped.
2479        assert_eq!(
2480            agent.msg.messages.len(),
2481            messages_before + 3,
2482            "user message with empty content but ToolResult parts must not be dropped"
2483        );
2484
2485        // The user message at index 1 must still carry the ToolResult part.
2486        let user_msg = &agent.msg.messages[messages_before + 1];
2487        assert_eq!(user_msg.role, Role::User);
2488        assert!(
2489            user_msg.parts.iter().any(
2490                |p| matches!(p, MessagePart::ToolResult { tool_use_id, .. } if tool_use_id == "call_rc3")
2491            ),
2492            "ToolResult part must be preserved on user message with empty content"
2493        );
2494    }
2495
2496    /// RC2 reverse pass: a user message with a `ToolResult` whose `tool_use_id` has no matching
2497    /// `ToolUse` in the preceding assistant message must be stripped by
2498    /// `strip_mid_history_orphans`.
2499    #[tokio::test]
2500    async fn strip_orphans_removes_orphaned_tool_result() {
2501        use zeph_llm::provider::MessagePart;
2502
2503        let provider = mock_provider(vec![]);
2504        let channel = MockChannel::new(vec![]);
2505        let registry = create_test_registry();
2506        let executor = MockToolExecutor::no_tools();
2507
2508        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2509        let cid = memory.sqlite().create_conversation().await.unwrap();
2510        let sqlite = memory.sqlite();
2511
2512        // Normal exchange before the orphan.
2513        sqlite.save_message(cid, "user", "hello").await.unwrap();
2514        sqlite.save_message(cid, "assistant", "hi").await.unwrap();
2515
2516        // Assistant message that does NOT contain a ToolUse.
2517        sqlite
2518            .save_message(cid, "assistant", "plain answer")
2519            .await
2520            .unwrap();
2521
2522        // User message references a tool_use_id that was never sent by the preceding assistant.
2523        let orphan_result_parts = serde_json::to_string(&[MessagePart::ToolResult {
2524            tool_use_id: "call_nonexistent".to_string(),
2525            content: "stale result".to_string(),
2526            is_error: false,
2527        }])
2528        .unwrap();
2529        sqlite
2530            .save_message_with_parts(
2531                cid,
2532                "user",
2533                "[tool_result: call_nonexistent]\nstale result",
2534                &orphan_result_parts,
2535            )
2536            .await
2537            .unwrap();
2538
2539        sqlite
2540            .save_message(cid, "assistant", "final")
2541            .await
2542            .unwrap();
2543
2544        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2545            std::sync::Arc::new(memory),
2546            cid,
2547            50,
2548            5,
2549            100,
2550        );
2551
2552        let messages_before = agent.msg.messages.len();
2553        agent.load_history().await.unwrap();
2554
2555        // The orphaned ToolResult part must have been stripped from the user message.
2556        // The user message itself may be removed (parts empty + content non-empty) or kept with
2557        // the text content — but it must NOT retain the orphaned ToolResult part.
2558        let loaded = &agent.msg.messages[messages_before..];
2559        for msg in loaded {
2560            assert!(
2561                !msg.parts.iter().any(|p| matches!(
2562                    p,
2563                    MessagePart::ToolResult { tool_use_id, .. } if tool_use_id == "call_nonexistent"
2564                )),
2565                "orphaned ToolResult part must be stripped from history"
2566            );
2567        }
2568    }
2569
2570    /// RC2 reverse pass: a complete `tool_use` + `tool_result` pair must pass through the reverse
2571    /// orphan check intact; the fix must not strip valid `ToolResult` parts.
2572    #[tokio::test]
2573    async fn strip_orphans_keeps_complete_pair() {
2574        use zeph_llm::provider::MessagePart;
2575
2576        let provider = mock_provider(vec![]);
2577        let channel = MockChannel::new(vec![]);
2578        let registry = create_test_registry();
2579        let executor = MockToolExecutor::no_tools();
2580
2581        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2582        let cid = memory.sqlite().create_conversation().await.unwrap();
2583        let sqlite = memory.sqlite();
2584
2585        let use_parts = serde_json::to_string(&[MessagePart::ToolUse {
2586            id: "call_valid".to_string(),
2587            name: "shell".to_string(),
2588            input: serde_json::json!({"command": "ls"}),
2589        }])
2590        .unwrap();
2591        sqlite
2592            .save_message_with_parts(cid, "assistant", "[tool_use: shell]", &use_parts)
2593            .await
2594            .unwrap();
2595
2596        let result_parts = serde_json::to_string(&[MessagePart::ToolResult {
2597            tool_use_id: "call_valid".to_string(),
2598            content: "file.rs".to_string(),
2599            is_error: false,
2600        }])
2601        .unwrap();
2602        sqlite
2603            .save_message_with_parts(cid, "user", "", &result_parts)
2604            .await
2605            .unwrap();
2606
2607        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2608            std::sync::Arc::new(memory),
2609            cid,
2610            50,
2611            5,
2612            100,
2613        );
2614
2615        let messages_before = agent.msg.messages.len();
2616        agent.load_history().await.unwrap();
2617
2618        assert_eq!(
2619            agent.msg.messages.len(),
2620            messages_before + 2,
2621            "complete tool_use/tool_result pair must be preserved"
2622        );
2623
2624        let user_msg = &agent.msg.messages[messages_before + 1];
2625        assert!(
2626            user_msg.parts.iter().any(|p| matches!(
2627                p,
2628                MessagePart::ToolResult { tool_use_id, .. } if tool_use_id == "call_valid"
2629            )),
2630            "ToolResult part for a matched tool_use must not be stripped"
2631        );
2632    }
2633
2634    /// RC2 reverse pass: history with a mix of complete pairs and orphaned `ToolResult` messages.
2635    /// Orphaned `ToolResult` parts must be stripped; complete pairs must pass through intact.
2636    #[tokio::test]
2637    async fn strip_orphans_mixed_history() {
2638        use zeph_llm::provider::MessagePart;
2639
2640        let provider = mock_provider(vec![]);
2641        let channel = MockChannel::new(vec![]);
2642        let registry = create_test_registry();
2643        let executor = MockToolExecutor::no_tools();
2644
2645        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2646        let cid = memory.sqlite().create_conversation().await.unwrap();
2647        let sqlite = memory.sqlite();
2648
2649        // First: complete tool_use / tool_result pair.
2650        let use_parts_ok = serde_json::to_string(&[MessagePart::ToolUse {
2651            id: "call_good".to_string(),
2652            name: "shell".to_string(),
2653            input: serde_json::json!({"command": "pwd"}),
2654        }])
2655        .unwrap();
2656        sqlite
2657            .save_message_with_parts(cid, "assistant", "[tool_use: shell]", &use_parts_ok)
2658            .await
2659            .unwrap();
2660
2661        let result_parts_ok = serde_json::to_string(&[MessagePart::ToolResult {
2662            tool_use_id: "call_good".to_string(),
2663            content: "/home".to_string(),
2664            is_error: false,
2665        }])
2666        .unwrap();
2667        sqlite
2668            .save_message_with_parts(cid, "user", "", &result_parts_ok)
2669            .await
2670            .unwrap();
2671
2672        // Second: plain assistant message followed by an orphaned ToolResult user message.
2673        sqlite
2674            .save_message(cid, "assistant", "text only")
2675            .await
2676            .unwrap();
2677
2678        let orphan_parts = serde_json::to_string(&[MessagePart::ToolResult {
2679            tool_use_id: "call_ghost".to_string(),
2680            content: "ghost result".to_string(),
2681            is_error: false,
2682        }])
2683        .unwrap();
2684        sqlite
2685            .save_message_with_parts(
2686                cid,
2687                "user",
2688                "[tool_result: call_ghost]\nghost result",
2689                &orphan_parts,
2690            )
2691            .await
2692            .unwrap();
2693
2694        sqlite
2695            .save_message(cid, "assistant", "final reply")
2696            .await
2697            .unwrap();
2698
2699        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2700            std::sync::Arc::new(memory),
2701            cid,
2702            50,
2703            5,
2704            100,
2705        );
2706
2707        let messages_before = agent.msg.messages.len();
2708        agent.load_history().await.unwrap();
2709
2710        let loaded = &agent.msg.messages[messages_before..];
2711
2712        // The orphaned ToolResult part must not appear in any message.
2713        for msg in loaded {
2714            assert!(
2715                !msg.parts.iter().any(|p| matches!(
2716                    p,
2717                    MessagePart::ToolResult { tool_use_id, .. } if tool_use_id == "call_ghost"
2718                )),
2719                "orphaned ToolResult (call_ghost) must be stripped from history"
2720            );
2721        }
2722
2723        // The matched ToolResult must still be present on the user message following the
2724        // first assistant message.
2725        let has_good_result = loaded.iter().any(|msg| {
2726            msg.role == Role::User
2727                && msg.parts.iter().any(|p| {
2728                    matches!(
2729                        p,
2730                        MessagePart::ToolResult { tool_use_id, .. } if tool_use_id == "call_good"
2731                    )
2732                })
2733        });
2734        assert!(
2735            has_good_result,
2736            "matched ToolResult (call_good) must be preserved in history"
2737        );
2738    }
2739
2740    /// Regression: a properly matched `tool_use`/`tool_result` pair must NOT be touched by the
2741    /// mid-history scan — ensures the fix doesn't break valid tool exchanges.
2742    #[tokio::test]
2743    async fn sanitize_tool_pairs_preserves_matched_tool_pair() {
2744        use zeph_llm::provider::MessagePart;
2745
2746        let provider = mock_provider(vec![]);
2747        let channel = MockChannel::new(vec![]);
2748        let registry = create_test_registry();
2749        let executor = MockToolExecutor::no_tools();
2750
2751        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2752        let cid = memory.sqlite().create_conversation().await.unwrap();
2753        let sqlite = memory.sqlite();
2754
2755        sqlite
2756            .save_message(cid, "user", "run a command")
2757            .await
2758            .unwrap();
2759
2760        // Assistant sends a ToolUse.
2761        let use_parts = serde_json::to_string(&[MessagePart::ToolUse {
2762            id: "call_ok".to_string(),
2763            name: "shell".to_string(),
2764            input: serde_json::json!({"command": "echo hi"}),
2765        }])
2766        .unwrap();
2767        sqlite
2768            .save_message_with_parts(cid, "assistant", "[tool_use: shell]", &use_parts)
2769            .await
2770            .unwrap();
2771
2772        // Matching user ToolResult follows.
2773        let result_parts = serde_json::to_string(&[MessagePart::ToolResult {
2774            tool_use_id: "call_ok".to_string(),
2775            content: "hi".to_string(),
2776            is_error: false,
2777        }])
2778        .unwrap();
2779        sqlite
2780            .save_message_with_parts(cid, "user", "[tool_result: call_ok]\nhi", &result_parts)
2781            .await
2782            .unwrap();
2783
2784        sqlite.save_message(cid, "assistant", "done").await.unwrap();
2785
2786        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2787            std::sync::Arc::new(memory),
2788            cid,
2789            50,
2790            5,
2791            100,
2792        );
2793
2794        let messages_before = agent.msg.messages.len();
2795        agent.load_history().await.unwrap();
2796
2797        // All 4 messages preserved, tool_use parts intact.
2798        assert_eq!(
2799            agent.msg.messages.len(),
2800            messages_before + 4,
2801            "matched tool pair must not be removed"
2802        );
2803        let tool_msg = &agent.msg.messages[messages_before + 1];
2804        assert!(
2805            tool_msg
2806                .parts
2807                .iter()
2808                .any(|p| matches!(p, MessagePart::ToolUse { id, .. } if id == "call_ok")),
2809            "matched ToolUse parts must be preserved"
2810        );
2811    }
2812
2813    /// RC5: `persist_cancelled_tool_results` must persist a tombstone user message containing
2814    /// `is_error=true` `ToolResult` parts for all `tool_calls` IDs so the preceding assistant
2815    /// `ToolUse` is never orphaned in the DB after a cancellation.
2816    #[tokio::test]
2817    async fn persist_cancelled_tool_results_pairs_tool_use() {
2818        use zeph_llm::provider::MessagePart;
2819
2820        let provider = mock_provider(vec![]);
2821        let channel = MockChannel::new(vec![]);
2822        let registry = create_test_registry();
2823        let executor = MockToolExecutor::no_tools();
2824
2825        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2826        let cid = memory.sqlite().create_conversation().await.unwrap();
2827
2828        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2829            std::sync::Arc::new(memory),
2830            cid,
2831            50,
2832            5,
2833            100,
2834        );
2835
2836        // Simulate: assistant message with two ToolUse parts already persisted.
2837        let tool_calls = vec![
2838            zeph_llm::provider::ToolUseRequest {
2839                id: "cancel_id_1".to_string(),
2840                name: "shell".to_string().into(),
2841                input: serde_json::json!({}),
2842            },
2843            zeph_llm::provider::ToolUseRequest {
2844                id: "cancel_id_2".to_string(),
2845                name: "read_file".to_string().into(),
2846                input: serde_json::json!({}),
2847            },
2848        ];
2849
2850        agent.persist_cancelled_tool_results(&tool_calls).await;
2851
2852        let history = agent
2853            .services
2854            .memory
2855            .persistence
2856            .memory
2857            .as_ref()
2858            .unwrap()
2859            .sqlite()
2860            .load_history(cid, 50)
2861            .await
2862            .unwrap();
2863
2864        // Exactly one user message must have been persisted.
2865        assert_eq!(history.len(), 1);
2866        assert_eq!(history[0].role, Role::User);
2867
2868        // It must contain is_error=true ToolResult for each tool call ID.
2869        for tc in &tool_calls {
2870            assert!(
2871                history[0].parts.iter().any(|p| matches!(
2872                    p,
2873                    MessagePart::ToolResult { tool_use_id, is_error, .. }
2874                        if tool_use_id == &tc.id && *is_error
2875                )),
2876                "tombstone ToolResult for {} must be present and is_error=true",
2877                tc.id
2878            );
2879        }
2880    }
2881
2882    // ---- Integration tests for the #2529 fix: soft-delete of legacy-content orphans ----
2883
2884    /// #2529 regression: orphaned assistant `ToolUse` + user `ToolResult` pair where BOTH messages
2885    /// have content consisting solely of legacy tool bracket strings (no human-readable text).
2886    ///
2887    /// Before the fix, `content.trim().is_empty()` returned false for these messages, so they
2888    /// were never soft-deleted and the WARN log repeated on every session restart.
2889    ///
2890    /// After the fix, `has_meaningful_content` returns false for legacy-only content, so both
2891    /// orphaned messages are soft-deleted (non-null `deleted_at`) in `SQLite`.
2892    #[tokio::test]
2893    async fn issue_2529_orphaned_legacy_content_pair_is_soft_deleted() {
2894        use zeph_llm::provider::MessagePart;
2895
2896        let provider = mock_provider(vec![]);
2897        let channel = MockChannel::new(vec![]);
2898        let registry = create_test_registry();
2899        let executor = MockToolExecutor::no_tools();
2900
2901        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
2902        let cid = memory.sqlite().create_conversation().await.unwrap();
2903        let sqlite = memory.sqlite();
2904
2905        // A normal user message that anchors the conversation.
2906        sqlite
2907            .save_message(cid, "user", "save this for me")
2908            .await
2909            .unwrap();
2910
2911        // Orphaned assistant[ToolUse]: content is ONLY a legacy tool tag — no matching
2912        // ToolResult follows. This is the exact pattern that triggered #2529.
2913        let use_parts = serde_json::to_string(&[MessagePart::ToolUse {
2914            id: "call_2529".to_string(),
2915            name: "memory_save".to_string(),
2916            input: serde_json::json!({"content": "save this"}),
2917        }])
2918        .unwrap();
2919        let orphan_assistant_id = sqlite
2920            .save_message_with_parts(
2921                cid,
2922                "assistant",
2923                "[tool_use: memory_save(call_2529)]",
2924                &use_parts,
2925            )
2926            .await
2927            .unwrap();
2928
2929        // Orphaned user[ToolResult]: content is ONLY a legacy tool tag + body.
2930        // Its tool_use_id ("call_2529") does not match any ToolUse in the preceding assistant
2931        // message in this position (will be made orphaned by inserting a plain assistant message
2932        // between them to break the pair).
2933        sqlite
2934            .save_message(cid, "assistant", "here is a plain reply")
2935            .await
2936            .unwrap();
2937
2938        let result_parts = serde_json::to_string(&[MessagePart::ToolResult {
2939            tool_use_id: "call_2529".to_string(),
2940            content: "saved".to_string(),
2941            is_error: false,
2942        }])
2943        .unwrap();
2944        let orphan_user_id = sqlite
2945            .save_message_with_parts(
2946                cid,
2947                "user",
2948                "[tool_result: call_2529]\nsaved",
2949                &result_parts,
2950            )
2951            .await
2952            .unwrap();
2953
2954        // Final plain message so history doesn't end on the orphan.
2955        sqlite.save_message(cid, "assistant", "done").await.unwrap();
2956
2957        let memory_arc = std::sync::Arc::new(memory);
2958        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
2959            memory_arc.clone(),
2960            cid,
2961            50,
2962            5,
2963            100,
2964        );
2965
2966        agent.load_history().await.unwrap();
2967
2968        // Verify that both orphaned messages now have `deleted_at IS NOT NULL` in SQLite.
2969        // COUNT(*) WHERE deleted_at IS NOT NULL returns 1 if soft-deleted, 0 otherwise.
2970        let assistant_deleted_count: Vec<i64> = zeph_db::query_scalar(
2971            "SELECT COUNT(*) FROM messages WHERE id = ? AND deleted_at IS NOT NULL",
2972        )
2973        .bind(orphan_assistant_id)
2974        .fetch_all(memory_arc.sqlite().pool())
2975        .await
2976        .unwrap();
2977
2978        let user_deleted_count: Vec<i64> = zeph_db::query_scalar(
2979            "SELECT COUNT(*) FROM messages WHERE id = ? AND deleted_at IS NOT NULL",
2980        )
2981        .bind(orphan_user_id)
2982        .fetch_all(memory_arc.sqlite().pool())
2983        .await
2984        .unwrap();
2985
2986        assert_eq!(
2987            assistant_deleted_count.first().copied().unwrap_or(0),
2988            1,
2989            "orphaned assistant[ToolUse] with legacy-only content must be soft-deleted (deleted_at IS NOT NULL)"
2990        );
2991        assert_eq!(
2992            user_deleted_count.first().copied().unwrap_or(0),
2993            1,
2994            "orphaned user[ToolResult] with legacy-only content must be soft-deleted (deleted_at IS NOT NULL)"
2995        );
2996    }
2997
2998    /// #2529 idempotency: after soft-delete on first `load_history`, a second call must not
2999    /// re-load the soft-deleted orphans. This ensures the WARN log does not repeat on the
3000    /// next session startup for the same orphaned messages.
3001    #[tokio::test]
3002    async fn issue_2529_soft_delete_is_idempotent_across_sessions() {
3003        use zeph_llm::provider::MessagePart;
3004
3005        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
3006        let cid = memory.sqlite().create_conversation().await.unwrap();
3007        let sqlite = memory.sqlite();
3008
3009        // Normal anchor message.
3010        sqlite
3011            .save_message(cid, "user", "do something")
3012            .await
3013            .unwrap();
3014
3015        // Orphaned assistant[ToolUse] with legacy-only content.
3016        let use_parts = serde_json::to_string(&[MessagePart::ToolUse {
3017            id: "call_idem".to_string(),
3018            name: "shell".to_string(),
3019            input: serde_json::json!({"command": "ls"}),
3020        }])
3021        .unwrap();
3022        sqlite
3023            .save_message_with_parts(cid, "assistant", "[tool_use: shell(call_idem)]", &use_parts)
3024            .await
3025            .unwrap();
3026
3027        // Break the pair: insert a plain assistant message so the ToolUse is mid-history orphan.
3028        sqlite
3029            .save_message(cid, "assistant", "continuing")
3030            .await
3031            .unwrap();
3032
3033        // Orphaned user[ToolResult] with legacy-only content.
3034        let result_parts = serde_json::to_string(&[MessagePart::ToolResult {
3035            tool_use_id: "call_idem".to_string(),
3036            content: "output".to_string(),
3037            is_error: false,
3038        }])
3039        .unwrap();
3040        sqlite
3041            .save_message_with_parts(
3042                cid,
3043                "user",
3044                "[tool_result: call_idem]\noutput",
3045                &result_parts,
3046            )
3047            .await
3048            .unwrap();
3049
3050        sqlite
3051            .save_message(cid, "assistant", "final")
3052            .await
3053            .unwrap();
3054
3055        let memory_arc = std::sync::Arc::new(memory);
3056
3057        // First session: load_history performs soft-delete of the orphaned pair.
3058        let mut agent1 = Agent::new(
3059            mock_provider(vec![]),
3060            MockChannel::new(vec![]),
3061            create_test_registry(),
3062            None,
3063            5,
3064            MockToolExecutor::no_tools(),
3065        )
3066        .with_memory(memory_arc.clone(), cid, 50, 5, 100);
3067        agent1.load_history().await.unwrap();
3068        let count_after_first = agent1.msg.messages.len();
3069
3070        // Second session: a fresh agent loading the same conversation must not see the
3071        // soft-deleted orphans — the WARN log must not repeat.
3072        let mut agent2 = Agent::new(
3073            mock_provider(vec![]),
3074            MockChannel::new(vec![]),
3075            create_test_registry(),
3076            None,
3077            5,
3078            MockToolExecutor::no_tools(),
3079        )
3080        .with_memory(memory_arc.clone(), cid, 50, 5, 100);
3081        agent2.load_history().await.unwrap();
3082        let count_after_second = agent2.msg.messages.len();
3083
3084        // Both sessions must load the same number of messages — soft-deleted orphans excluded.
3085        assert_eq!(
3086            count_after_first, count_after_second,
3087            "second load_history must load the same message count as the first (soft-deleted orphans excluded)"
3088        );
3089    }
3090
3091    /// Edge case for #2529: an orphaned assistant message whose content has BOTH meaningful text
3092    /// AND a `tool_use` tag must NOT be soft-deleted. The `ToolUse` parts are stripped but the
3093    /// message is kept because it has human-readable content.
3094    #[tokio::test]
3095    async fn issue_2529_message_with_text_and_tool_tag_is_kept_after_part_strip() {
3096        use zeph_llm::provider::MessagePart;
3097
3098        let provider = mock_provider(vec![]);
3099        let channel = MockChannel::new(vec![]);
3100        let registry = create_test_registry();
3101        let executor = MockToolExecutor::no_tools();
3102
3103        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
3104        let cid = memory.sqlite().create_conversation().await.unwrap();
3105        let sqlite = memory.sqlite();
3106
3107        // Normal first exchange.
3108        sqlite
3109            .save_message(cid, "user", "check the files")
3110            .await
3111            .unwrap();
3112
3113        // Assistant message: has BOTH meaningful text AND a ToolUse part.
3114        // Content contains real prose + legacy tag — has_meaningful_content must return true.
3115        let use_parts = serde_json::to_string(&[MessagePart::ToolUse {
3116            id: "call_mixed".to_string(),
3117            name: "shell".to_string(),
3118            input: serde_json::json!({"command": "ls"}),
3119        }])
3120        .unwrap();
3121        sqlite
3122            .save_message_with_parts(
3123                cid,
3124                "assistant",
3125                "Let me list the directory. [tool_use: shell(call_mixed)]",
3126                &use_parts,
3127            )
3128            .await
3129            .unwrap();
3130
3131        // No matching ToolResult follows — the ToolUse is orphaned.
3132        sqlite.save_message(cid, "user", "thanks").await.unwrap();
3133        sqlite
3134            .save_message(cid, "assistant", "you are welcome")
3135            .await
3136            .unwrap();
3137
3138        let memory_arc = std::sync::Arc::new(memory);
3139        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
3140            memory_arc.clone(),
3141            cid,
3142            50,
3143            5,
3144            100,
3145        );
3146
3147        let messages_before = agent.msg.messages.len();
3148        agent.load_history().await.unwrap();
3149
3150        // All 4 messages must be present — the mixed-content assistant message is KEPT.
3151        assert_eq!(
3152            agent.msg.messages.len(),
3153            messages_before + 4,
3154            "assistant message with text + tool tag must not be removed after ToolUse strip"
3155        );
3156
3157        // The mixed-content assistant message must have its ToolUse parts stripped.
3158        let mixed_msg = agent
3159            .msg
3160            .messages
3161            .iter()
3162            .find(|m| m.content.contains("Let me list the directory"))
3163            .expect("mixed-content assistant message must still be in history");
3164        assert!(
3165            !mixed_msg
3166                .parts
3167                .iter()
3168                .any(|p| matches!(p, MessagePart::ToolUse { .. })),
3169            "orphaned ToolUse parts must be stripped even when message has meaningful text"
3170        );
3171        assert_eq!(
3172            mixed_msg.content, "Let me list the directory. [tool_use: shell(call_mixed)]",
3173            "content field must be unchanged — only parts are stripped"
3174        );
3175    }
3176
3177    // ── [skipped]/[stopped] ToolResult embedding guard (#2620) ──────────────
3178
3179    #[tokio::test]
3180    async fn persist_message_skipped_tool_result_does_not_embed() {
3181        use zeph_llm::provider::MessagePart;
3182
3183        let provider = mock_provider(vec![]);
3184        let channel = MockChannel::new(vec![]);
3185        let registry = create_test_registry();
3186        let executor = MockToolExecutor::no_tools();
3187
3188        let (tx, rx) = tokio::sync::watch::channel(MetricsSnapshot::default());
3189        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
3190        let cid = memory.sqlite().create_conversation().await.unwrap();
3191
3192        let mut agent = Agent::new(provider, channel, registry, None, 5, executor)
3193            .with_metrics(tx)
3194            .with_memory(std::sync::Arc::new(memory), cid, 50, 5, 100);
3195        agent.services.memory.persistence.autosave_assistant = true;
3196        agent.services.memory.persistence.autosave_min_length = 0;
3197
3198        let parts = vec![MessagePart::ToolResult {
3199            tool_use_id: "tu1".into(),
3200            content: "[skipped] bash tool was blocked by utility gate".into(),
3201            is_error: false,
3202        }];
3203
3204        agent
3205            .persist_message(
3206                Role::User,
3207                "[skipped] bash tool was blocked by utility gate",
3208                &parts,
3209                false,
3210            )
3211            .await;
3212
3213        assert_eq!(
3214            rx.borrow().embeddings_generated,
3215            0,
3216            "[skipped] ToolResult must not be embedded into Qdrant"
3217        );
3218    }
3219
3220    #[tokio::test]
3221    async fn persist_message_stopped_tool_result_does_not_embed() {
3222        use zeph_llm::provider::MessagePart;
3223
3224        let provider = mock_provider(vec![]);
3225        let channel = MockChannel::new(vec![]);
3226        let registry = create_test_registry();
3227        let executor = MockToolExecutor::no_tools();
3228
3229        let (tx, rx) = tokio::sync::watch::channel(MetricsSnapshot::default());
3230        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
3231        let cid = memory.sqlite().create_conversation().await.unwrap();
3232
3233        let mut agent = Agent::new(provider, channel, registry, None, 5, executor)
3234            .with_metrics(tx)
3235            .with_memory(std::sync::Arc::new(memory), cid, 50, 5, 100);
3236        agent.services.memory.persistence.autosave_assistant = true;
3237        agent.services.memory.persistence.autosave_min_length = 0;
3238
3239        let parts = vec![MessagePart::ToolResult {
3240            tool_use_id: "tu2".into(),
3241            content: "[stopped] execution limit reached".into(),
3242            is_error: false,
3243        }];
3244
3245        agent
3246            .persist_message(
3247                Role::User,
3248                "[stopped] execution limit reached",
3249                &parts,
3250                false,
3251            )
3252            .await;
3253
3254        assert_eq!(
3255            rx.borrow().embeddings_generated,
3256            0,
3257            "[stopped] ToolResult must not be embedded into Qdrant"
3258        );
3259    }
3260
3261    #[tokio::test]
3262    async fn persist_message_normal_tool_result_is_saved_not_blocked_by_guard() {
3263        // Regression: a normal ToolResult (no [skipped]/[stopped] prefix) must not be
3264        // suppressed by the utility-gate guard and must reach the save path (SQLite).
3265        use zeph_llm::provider::MessagePart;
3266
3267        let provider = mock_provider(vec![]);
3268        let channel = MockChannel::new(vec![]);
3269        let registry = create_test_registry();
3270        let executor = MockToolExecutor::no_tools();
3271
3272        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
3273        let cid = memory.sqlite().create_conversation().await.unwrap();
3274        let memory_arc = std::sync::Arc::new(memory);
3275
3276        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
3277            memory_arc.clone(),
3278            cid,
3279            50,
3280            5,
3281            100,
3282        );
3283        agent.services.memory.persistence.autosave_assistant = true;
3284        agent.services.memory.persistence.autosave_min_length = 0;
3285
3286        let content = "total 42\ndrwxr-xr-x  5 user group";
3287        let parts = vec![MessagePart::ToolResult {
3288            tool_use_id: "tu3".into(),
3289            content: content.into(),
3290            is_error: false,
3291        }];
3292
3293        agent
3294            .persist_message(Role::User, content, &parts, false)
3295            .await;
3296
3297        // Must be saved to SQLite — confirms the guard did not block this path.
3298        let history = memory_arc.sqlite().load_history(cid, 50).await.unwrap();
3299        assert_eq!(
3300            history.len(),
3301            1,
3302            "normal ToolResult must be saved to SQLite"
3303        );
3304        assert_eq!(history[0].content, content);
3305    }
3306
3307    /// Verify that `enqueue_trajectory_extraction_task` uses a bounded tail slice instead of
3308    /// cloning the full message vec. We confirm the slice logic by checking that the
3309    /// `tail_start` calculation correctly bounds the window even with more messages than
3310    /// `max_messages`.
3311    #[test]
3312    fn trajectory_extraction_slice_bounds_messages() {
3313        // Replicate the slice logic from enqueue_trajectory_extraction_task.
3314        let max_messages: usize = 20;
3315        let total_messages = 100usize;
3316
3317        let tail_start = total_messages.saturating_sub(max_messages);
3318        let window = total_messages - tail_start;
3319
3320        assert_eq!(
3321            window, 20,
3322            "slice should contain exactly max_messages items"
3323        );
3324        assert_eq!(tail_start, 80, "slice should start at len - max_messages");
3325    }
3326
3327    #[test]
3328    fn trajectory_extraction_slice_handles_few_messages() {
3329        let max_messages: usize = 20;
3330        let total_messages = 5usize;
3331
3332        let tail_start = total_messages.saturating_sub(max_messages);
3333        let window = total_messages - tail_start;
3334
3335        assert_eq!(window, 5, "should return all messages when fewer than max");
3336        assert_eq!(tail_start, 0, "slice should start from the beginning");
3337    }
3338
3339    // --- #3168 regression tests ---
3340
3341    /// Round-trip: persist `Assistant[tool_use]` + `User[tool_result]`, then `load_history`.
3342    /// After `sanitize_tool_pairs` the pair must be intact — no WARN, both messages present
3343    /// with non-empty parts.
3344    #[tokio::test]
3345    async fn regression_3168_complete_tool_pair_survives_round_trip() {
3346        use zeph_llm::provider::MessagePart;
3347
3348        let provider = mock_provider(vec![]);
3349        let channel = MockChannel::new(vec![]);
3350        let registry = create_test_registry();
3351        let executor = MockToolExecutor::no_tools();
3352
3353        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
3354        let cid = memory.sqlite().create_conversation().await.unwrap();
3355        let sqlite = memory.sqlite();
3356
3357        let use_parts = serde_json::to_string(&[MessagePart::ToolUse {
3358            id: "r3168_call".to_string(),
3359            name: "shell".to_string(),
3360            input: serde_json::json!({"command": "echo hi"}),
3361        }])
3362        .unwrap();
3363        sqlite
3364            .save_message_with_parts(
3365                cid,
3366                "assistant",
3367                "[tool_use: shell(r3168_call)]",
3368                &use_parts,
3369            )
3370            .await
3371            .unwrap();
3372
3373        let result_parts = serde_json::to_string(&[MessagePart::ToolResult {
3374            tool_use_id: "r3168_call".to_string(),
3375            content: "[skipped]".to_string(),
3376            is_error: false,
3377        }])
3378        .unwrap();
3379        sqlite
3380            .save_message_with_parts(cid, "user", "[tool_result: r3168_call]", &result_parts)
3381            .await
3382            .unwrap();
3383
3384        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
3385            std::sync::Arc::new(memory),
3386            cid,
3387            50,
3388            5,
3389            100,
3390        );
3391
3392        let base = agent.msg.messages.len();
3393        agent.load_history().await.unwrap();
3394
3395        assert_eq!(
3396            agent.msg.messages.len(),
3397            base + 2,
3398            "both messages of the complete pair must survive load_history"
3399        );
3400
3401        let assistant_msg = agent
3402            .msg
3403            .messages
3404            .iter()
3405            .find(|m| m.role == Role::Assistant)
3406            .expect("assistant message missing after load_history");
3407        assert!(
3408            assistant_msg
3409                .parts
3410                .iter()
3411                .any(|p| matches!(p, MessagePart::ToolUse { id, .. } if id == "r3168_call")),
3412            "ToolUse part must be preserved in assistant message"
3413        );
3414
3415        let user_msg = agent
3416            .msg
3417            .messages
3418            .iter()
3419            .rev()
3420            .find(|m| m.role == Role::User)
3421            .expect("user message missing after load_history");
3422        assert!(
3423            user_msg.parts.iter().any(|p| matches!(p, MessagePart::ToolResult { tool_use_id, .. } if tool_use_id == "r3168_call")),
3424            "ToolResult part must be preserved in user message"
3425        );
3426    }
3427
3428    /// If parts serialization fails, `persist_message` must return early and not store
3429    /// a row with empty parts that would create an orphaned `tool_use` on next session load.
3430    /// We verify this by writing a row with invalid `parts_json` directly and confirming
3431    /// `load_history` skips it (empty parts row is not injected into the agent).
3432    #[tokio::test]
3433    async fn regression_3168_corrupt_parts_row_skipped_on_load() {
3434        use zeph_llm::provider::MessagePart;
3435
3436        let provider = mock_provider(vec![]);
3437        let channel = MockChannel::new(vec![]);
3438        let registry = create_test_registry();
3439        let executor = MockToolExecutor::no_tools();
3440
3441        let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await;
3442        let cid = memory.sqlite().create_conversation().await.unwrap();
3443        let sqlite = memory.sqlite();
3444
3445        // Simulate the pre-fix bug: assistant message stored with empty parts_json "[]"
3446        // even though it should have had a ToolUse part. This is what persist_message used
3447        // to do before the early-return fix.
3448        sqlite
3449            .save_message_with_parts(cid, "assistant", "[tool_use: shell(corrupt)]", "[]")
3450            .await
3451            .unwrap();
3452
3453        // User message with the matching tool_result stored correctly.
3454        let result_parts = serde_json::to_string(&[MessagePart::ToolResult {
3455            tool_use_id: "corrupt".to_string(),
3456            content: "result".to_string(),
3457            is_error: false,
3458        }])
3459        .unwrap();
3460        sqlite
3461            .save_message_with_parts(cid, "user", "[tool_result: corrupt]", &result_parts)
3462            .await
3463            .unwrap();
3464
3465        let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
3466            std::sync::Arc::new(memory),
3467            cid,
3468            50,
3469            5,
3470            100,
3471        );
3472
3473        let base = agent.msg.messages.len();
3474        agent.load_history().await.unwrap();
3475
3476        // The user ToolResult has no matching ToolUse in the assistant message (parts="[]"),
3477        // so sanitize_tool_pairs must strip the orphaned ToolResult.
3478        // Net result: only the content-only assistant message survives; user msg is removed
3479        // because after stripping its only part it becomes empty.
3480        let loaded = agent.msg.messages.len() - base;
3481        // No message injected with orphaned ToolResult parts — either stripped entirely or
3482        // the ToolResult part removed. Verify no user message with ToolResult remains.
3483        let orphan_present = agent.msg.messages.iter().any(|m| {
3484            m.role == Role::User
3485                && m.parts.iter().any(|p| {
3486                    matches!(p, MessagePart::ToolResult { tool_use_id, .. } if tool_use_id == "corrupt")
3487                })
3488        });
3489        assert!(
3490            !orphan_present,
3491            "orphaned ToolResult must not survive load_history; loaded={loaded}"
3492        );
3493    }
3494}