Skip to main content

solo_steward/
lib.rs

1// SPDX-License-Identifier: Apache-2.0
2
3//! Solo steward: consolidation pass (SWS-equivalent dedup, REM-equivalent
4//! integration, decay sweep, reconsolidation).
5//!
6//! Per ADR-0002, `Steward` is a struct, not a trait. The LLM backend is
7//! the swap point (via `solo_core::LlmClient`); the consolidation logic
8//! lives in one place.
9//!
10//! ## v0.2 status
11//!
12//!   - **`cluster_episodes`** (this commit): pure-deterministic SWS
13//!     pass. See [`cluster`] for the algorithm + tests. Implementation
14//!     does not call the LLM and does not touch the DB — caller pairs
15//!     `(Episode, Embedding)` from SQL upstream.
16//!   - **`abstract_cluster`**: TODO — REM-equivalent LLM call to
17//!     produce a [`SemanticAbstraction`] for each cluster. Will go in
18//!     `crate::abstraction` when added.
19//!   - **`detect_contradiction`**: TODO — bi-temporal SQL filter +
20//!     LLM judge for ambiguous cases. Will go in `crate::contradiction`.
21//!
22//! Storage wiring (writer → cluster → SQL persistence) lands in
23//! `solo-storage` separately.
24
25use std::sync::Arc;
26use std::time::Duration;
27
28use solo_core::{
29    Cluster, Contradiction, Embedding, Episode, Error, LlmClient, Result, SemanticAbstraction,
30    Triple,
31};
32
33/// Env var name for tuning the centroid-cosine threshold used by
34/// every clustering / existing-merge / merge-candidate count site.
35/// One knob governs them all to preserve the SQL-sync invariant
36/// between the writer's existing-merge and the doctor's count
37/// (see `solo_storage::merge_candidates` module docstring).
38pub const ENV_CLUSTER_COSINE_THRESHOLD: &str =
39    "SOLO_CLUSTER_COSINE_THRESHOLD";
40
41/// Env var name for tuning the minimum cluster size (number of
42/// episodes) below which a candidate cluster is discarded by the
43/// SWS-equivalent clustering pass. Must be a positive integer.
44/// Default 3.
45pub const ENV_CLUSTER_MIN_SIZE: &str = "SOLO_CLUSTER_MIN_SIZE";
46
47/// Env var name for tuning the abstraction LLM call's
48/// `max_tokens` request. Must be a positive integer in
49/// `[1, 65_536]` (upper bound catches typo'd values like
50/// `1_000_000` that would silently inflate per-call cost).
51/// Default 512.
52pub const ENV_ABSTRACTION_MAX_TOKENS: &str =
53    "SOLO_ABSTRACTION_MAX_TOKENS";
54
55/// Env var name for the contradiction-detection toggle. `true` or
56/// `false`, case-insensitive. When `false`, the consolidation pass
57/// skips the rule filter + LLM judge entirely and
58/// `contradictions_found` stays 0. Useful for cost-constrained
59/// operators willing to lose contradiction tracking. Default
60/// `true`.
61pub const ENV_CONTRADICTION_CHECK_ENABLED: &str =
62    "SOLO_CONTRADICTION_CHECK_ENABLED";
63
64pub mod abstraction;
65pub mod cluster;
66pub mod contradiction;
67
68#[cfg(any(test, feature = "test-support"))]
69pub mod test_support;
70
71pub struct Steward {
72    client: Arc<dyn LlmClient>,
73    config: StewardConfig,
74}
75
76#[derive(Debug, Clone)]
77pub struct StewardConfig {
78    pub cluster_min_size: usize,
79    pub cluster_cosine_threshold: f32,
80    pub abstraction_max_tokens: usize,
81    pub contradiction_check_enabled: bool,
82}
83
84impl Default for StewardConfig {
85    fn default() -> Self {
86        Self {
87            // Lowered from 3 → 2 to let small-corpus operators see
88            // clustering kick in on ≤30 episodes. The original 3 is
89            // appropriate at scale but creates a "nothing happens for
90            // weeks" UX for a personal-Jarvis user with sparse memory.
91            cluster_min_size: 2,
92            // Lowered from 0.85 → 0.55 for the bundled `all-MiniLM-L6-v2`
93            // embedder, which produces conservative similarity scores
94            // (topically-related pairs land in the 0.45-0.65 band rather
95            // than 0.80+). The 0.85 default was tuned for a stronger
96            // embedder; on the bundled model it catches near-paraphrases
97            // only and starves the consolidate pass of clusters.
98            //
99            // v0.11.1: both `cluster_min_size` and `cluster_cosine_threshold`
100            // are also exposed via `[steward]` in `solo.config.toml` (see
101            // `solo_storage::StewardSettings`). Operators who want
102            // different values from these baked-in defaults set them
103            // there (or via the long-standing `SOLO_CLUSTER_*` env vars,
104            // which still win); these defaults are now the floor when
105            // neither TOML nor env touches the knob.
106            cluster_cosine_threshold: 0.55,
107            abstraction_max_tokens: 512,
108            contradiction_check_enabled: true,
109        }
110    }
111}
112
113/// v0.10.1 (P4 audit m5): outcome of [`Steward::extract_triples_batch`].
114///
115/// Carries both the per-cluster successes (each abstraction is keyed by
116/// its cluster's `MemoryId`) AND the count of clusters that timed out
117/// during their per-cluster LLM call. The deferred count surfaces in
118/// the writer-actor's `MemoryTriplesExtract` audit row as
119/// `details_json.clusters_deferred`, giving operators an explicit
120/// "something is slow" signal in the audit log without having to grep
121/// `tracing::warn!` lines.
122///
123/// Pure-Rust failures (LLM returns an error result, not a timeout) are
124/// LOGGED but NOT counted in `deferred_count`. They're the implicit
125/// `clusters_failed` half of the audit row, computed by the writer as
126/// `cluster_count - abstractions_built - clusters_deferred`. The split
127/// is operator-actionable: deferred → slow backend, failed → backend
128/// rejected the prompt.
129#[derive(Debug, Default, Clone)]
130pub struct ExtractTriplesBatchOutcome {
131    /// One entry per cluster whose `abstract_cluster` call succeeded.
132    /// Iteration order matches the input order; the daemon-side
133    /// `AttachAbstractionBatch` handler persists these in a single
134    /// outer tx (with per-cluster `SAVEPOINT`s, per the v0.9.0 P4-
135    /// revision M2 hardening).
136    pub abstractions: Vec<(solo_core::MemoryId, SemanticAbstraction)>,
137    /// Number of clusters whose `abstract_cluster` call exceeded
138    /// `per_cluster_timeout`. Not present in `abstractions`; the
139    /// cluster's lack of a `semantic_abstractions` row will re-select
140    /// it on the next batch tick.
141    pub deferred_count: usize,
142}
143
144/// Read an env var, trimming whitespace. Returns `Some(value)` only
145/// if the var is set AND the trimmed value is non-empty. Treating
146/// empty strings as "unset" guards against shells where setting a
147/// var to nothing means "leave it default" rather than "force the
148/// empty value through the parser".
149fn env_trimmed(name: &str) -> Option<String> {
150    std::env::var(name).ok().and_then(|s| {
151        let t = s.trim();
152        if t.is_empty() {
153            None
154        } else {
155            Some(t.to_string())
156        }
157    })
158}
159
160impl StewardConfig {
161    /// Build a config from process env vars, falling back to
162    /// [`StewardConfig::default`] for unset / empty fields.
163    ///
164    /// Honoured env vars (all optional; unset → default):
165    ///
166    ///   - [`ENV_CLUSTER_COSINE_THRESHOLD`] — finite f32 in
167    ///     `(0.0, 1.0]`. Default `0.85`. Affects writer's clustering
168    ///     pass, in-run merge, existing-vs-existing merge, and
169    ///     doctor's merge-candidate count uniformly (SQL+threshold-
170    ///     sync invariant — see `solo_storage::merge_candidates`).
171    ///   - [`ENV_CLUSTER_MIN_SIZE`] — positive integer. Default `3`.
172    ///     Clusters with fewer episodes than this are discarded by
173    ///     the SWS-equivalent clustering pass.
174    ///   - [`ENV_ABSTRACTION_MAX_TOKENS`] — positive integer in
175    ///     `[1, 65_536]`. Default `512`. The `max_tokens` value the
176    ///     abstraction LLM call sends. Upper bound catches typo'd
177    ///     values that would silently inflate per-call cost.
178    ///   - [`ENV_CONTRADICTION_CHECK_ENABLED`] — `true` or `false`,
179    ///     case-insensitive. Default `true`. When `false`, skips
180    ///     the contradiction rule filter + LLM judge entirely.
181    ///
182    /// Returns `Err(Error::InvalidInput)` if any env var is set but
183    /// can't be parsed or is out of range. Operators would rather
184    /// see early failure than silently fall back to defaults (which
185    /// would mask a typo'd value). All-or-nothing: if any field
186    /// fails validation, the whole call errors and the prior fields'
187    /// successful parses don't take effect — daemon/consolidate
188    /// startup fails, operator fixes the env, retries.
189    pub fn from_env() -> Result<Self> {
190        let mut cfg = Self::default();
191
192        if let Some(raw) = env_trimmed(ENV_CLUSTER_COSINE_THRESHOLD) {
193            let parsed: f32 = raw.parse().map_err(|_| {
194                Error::invalid_input(format!(
195                    "{ENV_CLUSTER_COSINE_THRESHOLD}: not a valid f32 ({raw:?})"
196                ))
197            })?;
198            if !parsed.is_finite() || parsed <= 0.0 || parsed > 1.0 {
199                return Err(Error::invalid_input(format!(
200                    "{ENV_CLUSTER_COSINE_THRESHOLD}: must be a finite f32 in (0.0, 1.0], got {parsed}"
201                )));
202            }
203            cfg.cluster_cosine_threshold = parsed;
204        }
205
206        if let Some(raw) = env_trimmed(ENV_CLUSTER_MIN_SIZE) {
207            let parsed: usize = raw.parse().map_err(|_| {
208                Error::invalid_input(format!(
209                    "{ENV_CLUSTER_MIN_SIZE}: not a valid non-negative integer ({raw:?})"
210                ))
211            })?;
212            if parsed < 1 {
213                return Err(Error::invalid_input(format!(
214                    "{ENV_CLUSTER_MIN_SIZE}: must be >= 1, got {parsed}"
215                )));
216            }
217            cfg.cluster_min_size = parsed;
218        }
219
220        if let Some(raw) = env_trimmed(ENV_ABSTRACTION_MAX_TOKENS) {
221            let parsed: usize = raw.parse().map_err(|_| {
222                Error::invalid_input(format!(
223                    "{ENV_ABSTRACTION_MAX_TOKENS}: not a valid non-negative integer ({raw:?})"
224                ))
225            })?;
226            if !(1..=65_536).contains(&parsed) {
227                return Err(Error::invalid_input(format!(
228                    "{ENV_ABSTRACTION_MAX_TOKENS}: must be in [1, 65536], got {parsed}"
229                )));
230            }
231            cfg.abstraction_max_tokens = parsed;
232        }
233
234        if let Some(raw) = env_trimmed(ENV_CONTRADICTION_CHECK_ENABLED) {
235            let parsed = match raw.to_ascii_lowercase().as_str() {
236                "true" => true,
237                "false" => false,
238                _ => {
239                    return Err(Error::invalid_input(format!(
240                        "{ENV_CONTRADICTION_CHECK_ENABLED}: must be \"true\" or \"false\" (case-insensitive), got {raw:?}"
241                    )));
242                }
243            };
244            cfg.contradiction_check_enabled = parsed;
245        }
246
247        Ok(cfg)
248    }
249
250    /// v0.11.1: Build a config by layering TOML overrides under env-var
251    /// overrides, both on top of [`StewardConfig::default`]. Used by the
252    /// daemon and one-shot CLI paths so an operator can pin
253    /// `cluster_min_size` + `cluster_cosine_threshold` in
254    /// `solo.config.toml` and still let `SOLO_CLUSTER_*` env vars win
255    /// per-runtime (the v0.10.x escape-hatch contract).
256    ///
257    /// Resolution order (lowest precedence first):
258    ///   1. [`StewardConfig::default`]
259    ///   2. TOML `[steward]` block — `Some(_)` values from the supplied
260    ///      `toml_min_size` / `toml_cosine_threshold`
261    ///   3. Env vars — `SOLO_CLUSTER_MIN_SIZE` / `SOLO_CLUSTER_COSINE_THRESHOLD`
262    ///      via [`Self::from_env`]
263    ///
264    /// The two TOML values are passed by value rather than wrapped in a
265    /// dedicated struct so `solo-steward` doesn't depend on
266    /// `solo-storage::config::StewardSettings` (which would be circular
267    /// — `solo-storage` already deps on `solo-steward`). The
268    /// `solo-storage` side owns the TOML schema; this constructor's
269    /// signature is the carry-the-Option boundary.
270    ///
271    /// Validation: both TOML overrides are validated to the same bounds
272    /// as the env vars (`cluster_min_size >= 1`,
273    /// `cluster_cosine_threshold` finite in `(0.0, 1.0]`). A bad TOML
274    /// value surfaces as `Error::InvalidInput` with the bound and the
275    /// offending value, matching the env-var error messages so the
276    /// operator sees the same diagnostic regardless of which surface
277    /// they used.
278    pub fn from_settings_then_env(
279        toml_min_size: Option<usize>,
280        toml_cosine_threshold: Option<f32>,
281    ) -> Result<Self> {
282        let mut cfg = Self::default();
283
284        if let Some(parsed) = toml_min_size {
285            if parsed < 1 {
286                return Err(Error::invalid_input(format!(
287                    "[steward] cluster_min_size: must be >= 1, got {parsed}"
288                )));
289            }
290            cfg.cluster_min_size = parsed;
291        }
292
293        if let Some(parsed) = toml_cosine_threshold {
294            if !parsed.is_finite() || parsed <= 0.0 || parsed > 1.0 {
295                return Err(Error::invalid_input(format!(
296                    "[steward] cluster_cosine_threshold: must be a finite f32 in (0.0, 1.0], got {parsed}"
297                )));
298            }
299            cfg.cluster_cosine_threshold = parsed;
300        }
301
302        // Layer env vars on top. We re-run the env parser (rather than
303        // skip-to-fields) so `abstraction_max_tokens` /
304        // `contradiction_check_enabled` continue to read from env — the
305        // TOML surface only owns the clustering pair.
306        let env_cfg = Self::from_env()?;
307        // Preserve TOML cluster values UNLESS the env actually set
308        // something. `from_env()` starts from `default()` and only
309        // mutates fields whose env var was set, so equality-with-default
310        // is the "env didn't touch this" signal.
311        let env_default = Self::default();
312        if (env_cfg.cluster_min_size, env_cfg.cluster_cosine_threshold)
313            != (env_default.cluster_min_size, env_default.cluster_cosine_threshold)
314        {
315            // At least one env var fired; take its values. The
316            // `from_env` parser left untouched fields at default —
317            // those are the ones where TOML should still rule.
318            if env_cfg.cluster_min_size != env_default.cluster_min_size {
319                cfg.cluster_min_size = env_cfg.cluster_min_size;
320            }
321            if env_cfg.cluster_cosine_threshold != env_default.cluster_cosine_threshold {
322                cfg.cluster_cosine_threshold = env_cfg.cluster_cosine_threshold;
323            }
324        }
325        // The non-cluster env-driven fields always take effect — TOML
326        // doesn't carry these, so `env_cfg`'s values (which equal default
327        // when env was unset) are the right source of truth.
328        cfg.abstraction_max_tokens = env_cfg.abstraction_max_tokens;
329        cfg.contradiction_check_enabled = env_cfg.contradiction_check_enabled;
330
331        Ok(cfg)
332    }
333}
334
335impl Steward {
336    pub fn new(client: Arc<dyn LlmClient>, config: StewardConfig) -> Self {
337        Self { client, config }
338    }
339
340    /// Borrow the configured thresholds. Useful for callers that want
341    /// to call `cluster::cluster_episodes` directly (e.g. tests, or
342    /// future fine-grained pipelines).
343    pub fn config(&self) -> &StewardConfig {
344        &self.config
345    }
346
347    /// True iff this `Steward`'s LLM client talks to a real backend
348    /// (Anthropic, OpenAI, Ollama, future candle/local). False for
349    /// stub clients used in tests.
350    ///
351    /// Callers gate LLM-dependent work on this so the system stays
352    /// usable in stub-only configurations: e.g. the writer's
353    /// contradiction sweep early-returns when this is `false`,
354    /// because a stub's canned response can't faithfully arbitrate
355    /// triples it has never seen. The cheap clustering pass and the
356    /// abstraction pass remain reachable through their own gates.
357    ///
358    /// Delegates to [`solo_core::LlmClient::is_real_llm`], the per-
359    /// backend distinction.
360    pub fn has_llm(&self) -> bool {
361        self.client.is_real_llm()
362    }
363
364    /// SWS-equivalent: cluster the supplied `(Episode, Embedding)`
365    /// pairs by UTC day + cosine threshold. Pure-deterministic; does
366    /// not call the LLM. See [`cluster::cluster_episodes`] for the
367    /// algorithm.
368    pub async fn cluster_episodes(
369        &self,
370        inputs: &[(Episode, Embedding)],
371    ) -> Result<Vec<Cluster>> {
372        // Trivial wrapper today; preserves the async signature for
373        // when the algorithm grows entity-aware bucketing that may
374        // need to fan out (e.g. small per-entity LLM checks).
375        cluster::cluster_episodes(inputs, &self.config)
376    }
377
378    /// REM-equivalent: ask the LLM to produce a semantic abstraction
379    /// of a cluster. Provenance is preserved — `derived_from`
380    /// references every source episode.
381    ///
382    /// `episodes` must contain (at minimum) every `MemoryId` in
383    /// `cluster.episode_ids`. The Steward intentionally does not
384    /// hold a DB handle (per ADR-0002 "Steward depends only on
385    /// solo-core"), so the caller pairs cluster + episodes upstream.
386    pub async fn abstract_cluster(
387        &self,
388        cluster: &Cluster,
389        episodes: &[Episode],
390    ) -> Result<SemanticAbstraction> {
391        abstraction::abstract_cluster(cluster, episodes, self.client.as_ref()).await
392    }
393
394    /// v0.9.0 P4c: batch entry-point for the daemon-side consolidate
395    /// timer. For each `(cluster, episodes)` pair, calls
396    /// [`Self::abstract_cluster`] and accumulates the resulting
397    /// `SemanticAbstraction` (with its embedded `triples`) into the
398    /// returned [`ExtractTriplesBatchOutcome::abstractions`].
399    ///
400    /// Per-cluster failures are LOGGED but do NOT abort the batch:
401    /// the daemon-side caller persists every successful abstraction
402    /// in a single `WriteCommand::AttachAbstractionBatch` tx. A
403    /// half-failed batch (some clusters succeeded, some failed) is
404    /// the documented behavior — the failed clusters retry on the
405    /// next tick. This matches the v0.8.x writer-actor's
406    /// per-cluster log-and-skip discipline; we just moved the loop.
407    ///
408    /// v0.10.1 (P4 audit m5): each per-cluster `abstract_cluster`
409    /// call is wrapped in [`tokio::time::timeout`] with the supplied
410    /// `per_cluster_timeout`. A hung LLM call (MCP peer that never
411    /// responds, slow Ollama model, network stall on Anthropic) no
412    /// longer blocks the next cluster — the timeout fires, the
413    /// cluster is counted as "deferred", the batch continues. The
414    /// deferred cluster's lack of a `semantic_abstractions` row means
415    /// the NEXT batch tick will pick it up again automatically (the
416    /// reader pool's `fetch_clusters_without_abstractions` query
417    /// selects on the same predicate).
418    ///
419    /// A `per_cluster_timeout` of `Duration::ZERO` DISABLES the
420    /// per-cluster timeout — every call runs to natural completion.
421    /// Symmetric with the `[triples] cluster_timeout_secs = 0`
422    /// operator escape hatch in `solo.config.toml`. Useful for
423    /// operators on very slow local backends; not recommended in
424    /// production (a single hung peer can stall the batch).
425    ///
426    /// Cancellation note (v0.10.1): when the timeout fires,
427    /// `tokio::time::timeout` drops the inner `abstract_cluster`
428    /// future. If the underlying LLM transport doesn't honor
429    /// cancellation (some HTTP clients with in-flight requests may
430    /// not), the actual RPC may complete in the background. The
431    /// response is discarded — no double-write — but the LLM-side
432    /// work is wasted. Acceptable for v0.10.1.
433    ///
434    /// Output shape: [`ExtractTriplesBatchOutcome`] carries the
435    /// abstractions vec + a `deferred_count` for the audit row. The
436    /// caller persists `semantic_abstractions(content, provenance,
437    /// confidence)` + N `triples` rows per entry, and threads
438    /// `deferred_count` into the `MemoryTriplesExtract` audit row's
439    /// `details_json.clusters_deferred`.
440    ///
441    /// `Steward::has_llm() == false` is a NO-OP fast path: returns
442    /// an outcome with empty abstractions + zero deferred. Mirrors
443    /// the v0.8.x writer-actor's "if no LLM wired, skip abstraction"
444    /// gate.
445    pub async fn extract_triples_batch(
446        &self,
447        clusters_with_episodes: Vec<(Cluster, Vec<Episode>)>,
448        per_cluster_timeout: Duration,
449    ) -> ExtractTriplesBatchOutcome {
450        if !self.has_llm() {
451            return ExtractTriplesBatchOutcome::default();
452        }
453        let mut abstractions: Vec<(solo_core::MemoryId, SemanticAbstraction)> =
454            Vec::with_capacity(clusters_with_episodes.len());
455        let mut deferred_count: usize = 0;
456        let timeout_enabled = !per_cluster_timeout.is_zero();
457        for (cluster, episodes) in clusters_with_episodes {
458            let cluster_id = cluster.cluster_id;
459            let call = self.abstract_cluster(&cluster, &episodes);
460            let result = if timeout_enabled {
461                match tokio::time::timeout(per_cluster_timeout, call).await {
462                    Ok(inner) => inner,
463                    Err(_elapsed) => {
464                        tracing::warn!(
465                            cluster_id = %cluster_id,
466                            timeout_secs = per_cluster_timeout.as_secs(),
467                            "v0.10.1 m5 extract_triples_batch: abstract_cluster \
468                             timed out; deferring cluster to next batch tick \
469                             (the cluster's lack of a semantic_abstractions \
470                             row will re-select it on the next pass)"
471                        );
472                        deferred_count += 1;
473                        continue;
474                    }
475                }
476            } else {
477                call.await
478            };
479            match result {
480                Ok(abs) => {
481                    abstractions.push((cluster_id, abs));
482                }
483                Err(e) => {
484                    tracing::warn!(
485                        cluster_id = %cluster_id,
486                        error = %e,
487                        "v0.9.0 P4c extract_triples_batch: abstract_cluster \
488                         failed; cluster persists, abstraction retries on \
489                         next tick"
490                    );
491                }
492            }
493        }
494        ExtractTriplesBatchOutcome {
495            abstractions,
496            deferred_count,
497        }
498    }
499
500    /// Surface contradictions for the consolidation pass to flag
501    /// for resolution. Two-stage: cheap pure-Rust rule filter
502    /// (subject + predicate + temporal overlap) + LLM judge for
503    /// pairs that survive. See [`contradiction::detect_contradiction`]
504    /// for the algorithm + tests.
505    pub async fn detect_contradiction(
506        &self,
507        a: &Triple,
508        b: &Triple,
509    ) -> Result<Option<Contradiction>> {
510        contradiction::detect_contradiction(a, b, self.client.as_ref()).await
511    }
512}
513
514#[cfg(test)]
515mod from_env_tests {
516    use super::*;
517    use std::sync::Mutex;
518
519    // Env vars are process-global mutable state — serialize the tests
520    // here so cargo test's per-binary parallel runner doesn't race
521    // SOLO_CLUSTER_COSINE_THRESHOLD between cases. `unwrap_or_else
522    // (PoisonError::into_inner)` keeps a panicking test from poisoning
523    // the lock for the rest of the suite.
524    static ENV_LOCK: Mutex<()> = Mutex::new(());
525
526    /// Drops every steward env var on Drop so a panicking test doesn't
527    /// leak state into the next case (which would expect either "unset"
528    /// or its own set value). Cleanup list MUST stay in sync with the
529    /// const list at the top of the module.
530    struct EnvGuard;
531    impl Drop for EnvGuard {
532        fn drop(&mut self) {
533            // SAFETY: ENV_LOCK is held by the caller, so no other
534            // thread is concurrently reading or writing these vars.
535            for k in [
536                ENV_CLUSTER_COSINE_THRESHOLD,
537                ENV_CLUSTER_MIN_SIZE,
538                ENV_ABSTRACTION_MAX_TOKENS,
539                ENV_CONTRADICTION_CHECK_ENABLED,
540            ] {
541                unsafe { std::env::remove_var(k) };
542            }
543        }
544    }
545
546    /// Set ENV_CLUSTER_COSINE_THRESHOLD specifically. The other env
547    /// vars have their own helpers below.
548    fn set_env(value: &str) -> EnvGuard {
549        set_named_env(ENV_CLUSTER_COSINE_THRESHOLD, value)
550    }
551
552    fn set_named_env(name: &str, value: &str) -> EnvGuard {
553        // SAFETY: ENV_LOCK held; the only other accessor in-process is
554        // our own `from_env`, which the caller invokes synchronously.
555        unsafe { std::env::set_var(name, value) };
556        EnvGuard
557    }
558
559    fn clear_env() -> EnvGuard {
560        // SAFETY: same as set_env.
561        for k in [
562            ENV_CLUSTER_COSINE_THRESHOLD,
563            ENV_CLUSTER_MIN_SIZE,
564            ENV_ABSTRACTION_MAX_TOKENS,
565            ENV_CONTRADICTION_CHECK_ENABLED,
566        ] {
567            unsafe { std::env::remove_var(k) };
568        }
569        EnvGuard
570    }
571
572    #[test]
573    fn unset_env_yields_default_threshold() {
574        let _lock =
575            ENV_LOCK.lock().unwrap_or_else(|p| p.into_inner());
576        let _g = clear_env();
577        let cfg = StewardConfig::from_env().expect("ok");
578        assert_eq!(
579            cfg.cluster_cosine_threshold,
580            StewardConfig::default().cluster_cosine_threshold
581        );
582    }
583
584    #[test]
585    fn empty_or_whitespace_yields_default_threshold() {
586        let _lock =
587            ENV_LOCK.lock().unwrap_or_else(|p| p.into_inner());
588        for value in ["", "   ", "\t"] {
589            let _g = set_env(value);
590            let cfg = StewardConfig::from_env().expect("ok");
591            assert_eq!(
592                cfg.cluster_cosine_threshold,
593                StewardConfig::default().cluster_cosine_threshold,
594                "unexpected override for empty value {value:?}"
595            );
596        }
597    }
598
599    #[test]
600    fn valid_value_overrides_default() {
601        let _lock =
602            ENV_LOCK.lock().unwrap_or_else(|p| p.into_inner());
603        let _g = set_env("0.92");
604        let cfg = StewardConfig::from_env().expect("0.92 is valid");
605        assert!(
606            (cfg.cluster_cosine_threshold - 0.92).abs() < 1e-6,
607            "got {}",
608            cfg.cluster_cosine_threshold
609        );
610    }
611
612    #[test]
613    fn boundary_one_is_accepted() {
614        let _lock =
615            ENV_LOCK.lock().unwrap_or_else(|p| p.into_inner());
616        let _g = set_env("1.0");
617        let cfg = StewardConfig::from_env().expect("1.0 is valid");
618        assert_eq!(cfg.cluster_cosine_threshold, 1.0);
619    }
620
621    #[test]
622    fn unparseable_value_errors() {
623        let _lock =
624            ENV_LOCK.lock().unwrap_or_else(|p| p.into_inner());
625        let _g = set_env("not-a-number");
626        let err = StewardConfig::from_env().unwrap_err();
627        assert!(matches!(err, Error::InvalidInput(_)), "got {err:?}");
628    }
629
630    #[test]
631    fn zero_is_rejected() {
632        let _lock =
633            ENV_LOCK.lock().unwrap_or_else(|p| p.into_inner());
634        let _g = set_env("0.0");
635        let err = StewardConfig::from_env().unwrap_err();
636        assert!(matches!(err, Error::InvalidInput(_)));
637    }
638
639    #[test]
640    fn negative_is_rejected() {
641        let _lock =
642            ENV_LOCK.lock().unwrap_or_else(|p| p.into_inner());
643        let _g = set_env("-0.1");
644        let err = StewardConfig::from_env().unwrap_err();
645        assert!(matches!(err, Error::InvalidInput(_)));
646    }
647
648    #[test]
649    fn above_one_is_rejected() {
650        let _lock =
651            ENV_LOCK.lock().unwrap_or_else(|p| p.into_inner());
652        let _g = set_env("1.01");
653        let err = StewardConfig::from_env().unwrap_err();
654        assert!(matches!(err, Error::InvalidInput(_)));
655    }
656
657    #[test]
658    fn nan_is_rejected() {
659        let _lock =
660            ENV_LOCK.lock().unwrap_or_else(|p| p.into_inner());
661        let _g = set_env("NaN");
662        let err = StewardConfig::from_env().unwrap_err();
663        assert!(matches!(err, Error::InvalidInput(_)));
664    }
665
666    // ---- ENV_CLUSTER_MIN_SIZE ----
667
668    #[test]
669    fn cluster_min_size_valid_overrides_default() {
670        let _lock =
671            ENV_LOCK.lock().unwrap_or_else(|p| p.into_inner());
672        let _g = set_named_env(ENV_CLUSTER_MIN_SIZE, "5");
673        let cfg = StewardConfig::from_env().expect("5 is valid");
674        assert_eq!(cfg.cluster_min_size, 5);
675    }
676
677    #[test]
678    fn cluster_min_size_zero_is_rejected() {
679        let _lock =
680            ENV_LOCK.lock().unwrap_or_else(|p| p.into_inner());
681        let _g = set_named_env(ENV_CLUSTER_MIN_SIZE, "0");
682        let err = StewardConfig::from_env().unwrap_err();
683        assert!(matches!(err, Error::InvalidInput(_)));
684    }
685
686    #[test]
687    fn cluster_min_size_unparseable_errors() {
688        let _lock =
689            ENV_LOCK.lock().unwrap_or_else(|p| p.into_inner());
690        let _g = set_named_env(ENV_CLUSTER_MIN_SIZE, "not-a-number");
691        let err = StewardConfig::from_env().unwrap_err();
692        assert!(matches!(err, Error::InvalidInput(_)));
693    }
694
695    // ---- ENV_ABSTRACTION_MAX_TOKENS ----
696
697    #[test]
698    fn abstraction_max_tokens_valid_overrides_default() {
699        let _lock =
700            ENV_LOCK.lock().unwrap_or_else(|p| p.into_inner());
701        let _g = set_named_env(ENV_ABSTRACTION_MAX_TOKENS, "1024");
702        let cfg = StewardConfig::from_env().expect("1024 is valid");
703        assert_eq!(cfg.abstraction_max_tokens, 1024);
704    }
705
706    #[test]
707    fn abstraction_max_tokens_above_upper_bound_is_rejected() {
708        let _lock =
709            ENV_LOCK.lock().unwrap_or_else(|p| p.into_inner());
710        let _g = set_named_env(ENV_ABSTRACTION_MAX_TOKENS, "131072");
711        let err = StewardConfig::from_env().unwrap_err();
712        assert!(matches!(err, Error::InvalidInput(_)));
713    }
714
715    #[test]
716    fn abstraction_max_tokens_zero_is_rejected() {
717        let _lock =
718            ENV_LOCK.lock().unwrap_or_else(|p| p.into_inner());
719        let _g = set_named_env(ENV_ABSTRACTION_MAX_TOKENS, "0");
720        let err = StewardConfig::from_env().unwrap_err();
721        assert!(matches!(err, Error::InvalidInput(_)));
722    }
723
724    // ---- ENV_CONTRADICTION_CHECK_ENABLED ----
725
726    #[test]
727    fn contradiction_check_false_overrides_default_true() {
728        let _lock =
729            ENV_LOCK.lock().unwrap_or_else(|p| p.into_inner());
730        let _g = set_named_env(ENV_CONTRADICTION_CHECK_ENABLED, "false");
731        let cfg = StewardConfig::from_env().expect("false is valid");
732        assert!(!cfg.contradiction_check_enabled);
733    }
734
735    #[test]
736    fn contradiction_check_accepts_case_insensitive() {
737        let _lock =
738            ENV_LOCK.lock().unwrap_or_else(|p| p.into_inner());
739        for value in ["True", "TRUE", "tRuE"] {
740            let _g = set_named_env(ENV_CONTRADICTION_CHECK_ENABLED, value);
741            let cfg =
742                StewardConfig::from_env().expect("case-insensitive true");
743            assert!(cfg.contradiction_check_enabled, "got false for {value:?}");
744        }
745        for value in ["False", "FALSE", "fAlSe"] {
746            let _g = set_named_env(ENV_CONTRADICTION_CHECK_ENABLED, value);
747            let cfg =
748                StewardConfig::from_env().expect("case-insensitive false");
749            assert!(!cfg.contradiction_check_enabled, "got true for {value:?}");
750        }
751    }
752
753    #[test]
754    fn contradiction_check_rejects_non_bool_strings() {
755        let _lock =
756            ENV_LOCK.lock().unwrap_or_else(|p| p.into_inner());
757        // Numeric / yes-no / on-off shorthand intentionally NOT
758        // accepted — strict bool spelling avoids ambiguity ("0" looks
759        // boolean-like in many languages but `bool::from_str` rejects it).
760        for value in ["1", "0", "yes", "no", "on", "off", "maybe"] {
761            let _g = set_named_env(ENV_CONTRADICTION_CHECK_ENABLED, value);
762            let err = StewardConfig::from_env().unwrap_err();
763            assert!(
764                matches!(err, Error::InvalidInput(_)),
765                "expected error for {value:?}"
766            );
767        }
768    }
769
770    // ---- cross-field ----
771
772    #[test]
773    fn all_four_env_vars_set_simultaneously_take_effect() {
774        let _lock =
775            ENV_LOCK.lock().unwrap_or_else(|p| p.into_inner());
776        let _g1 = set_named_env(ENV_CLUSTER_COSINE_THRESHOLD, "0.91");
777        let _g2 = set_named_env(ENV_CLUSTER_MIN_SIZE, "5");
778        let _g3 = set_named_env(ENV_ABSTRACTION_MAX_TOKENS, "1024");
779        let _g4 =
780            set_named_env(ENV_CONTRADICTION_CHECK_ENABLED, "false");
781        let cfg =
782            StewardConfig::from_env().expect("all four valid together");
783        assert!((cfg.cluster_cosine_threshold - 0.91).abs() < 1e-6);
784        assert_eq!(cfg.cluster_min_size, 5);
785        assert_eq!(cfg.abstraction_max_tokens, 1024);
786        assert!(!cfg.contradiction_check_enabled);
787    }
788
789    // ----------------------------------------------------------------
790    // v0.11.1 — from_settings_then_env layering
791    // ----------------------------------------------------------------
792
793    /// Both inputs `None` AND no env set → result equals
794    /// `StewardConfig::default()`. Pins the backward-compat path: pre-
795    /// v0.11.1 configs (no `[steward]` block) and no env vars must not
796    /// change daemon behaviour.
797    #[test]
798    fn from_settings_then_env_no_overrides_yields_default() {
799        let _lock =
800            ENV_LOCK.lock().unwrap_or_else(|p| p.into_inner());
801        let _g = clear_env();
802        let cfg = StewardConfig::from_settings_then_env(None, None)
803            .expect("no inputs, no env, must succeed");
804        let d = StewardConfig::default();
805        assert_eq!(cfg.cluster_min_size, d.cluster_min_size);
806        assert!((cfg.cluster_cosine_threshold - d.cluster_cosine_threshold).abs() < 1e-6);
807        assert_eq!(cfg.abstraction_max_tokens, d.abstraction_max_tokens);
808        assert_eq!(cfg.contradiction_check_enabled, d.contradiction_check_enabled);
809    }
810
811    /// TOML-supplied values flow through when env vars are unset.
812    /// This is the v0.11.1 happy path: operator writes
813    /// `[steward] cluster_min_size = 4` + `cluster_cosine_threshold = 0.7`
814    /// and that's exactly what the daemon uses for clustering.
815    #[test]
816    fn from_settings_then_env_toml_values_take_effect_without_env() {
817        let _lock =
818            ENV_LOCK.lock().unwrap_or_else(|p| p.into_inner());
819        let _g = clear_env();
820        let cfg = StewardConfig::from_settings_then_env(Some(4), Some(0.7))
821            .expect("valid TOML inputs");
822        assert_eq!(cfg.cluster_min_size, 4);
823        assert!((cfg.cluster_cosine_threshold - 0.7).abs() < 1e-6);
824    }
825
826    /// Env vars beat TOML overrides — the operator's runtime escape
827    /// hatch (set `SOLO_CLUSTER_*` in the shell to override the config
828    /// file without editing it) must keep working in v0.11.1.
829    #[test]
830    fn from_settings_then_env_env_wins_over_toml() {
831        let _lock =
832            ENV_LOCK.lock().unwrap_or_else(|p| p.into_inner());
833        let _g1 = set_named_env(ENV_CLUSTER_MIN_SIZE, "8");
834        let _g2 = set_named_env(ENV_CLUSTER_COSINE_THRESHOLD, "0.95");
835        // TOML asks for (3, 0.5); env asks for (8, 0.95). Env must win.
836        let cfg = StewardConfig::from_settings_then_env(Some(3), Some(0.5))
837            .expect("env wins over TOML, both valid");
838        assert_eq!(cfg.cluster_min_size, 8, "env override beats TOML");
839        assert!(
840            (cfg.cluster_cosine_threshold - 0.95).abs() < 1e-6,
841            "env override beats TOML"
842        );
843    }
844
845    /// Partial layering: env sets ONLY `SOLO_CLUSTER_MIN_SIZE`; TOML
846    /// supplies the cosine threshold. Resolution should pick env for
847    /// min_size + TOML for the threshold (per-field precedence, not
848    /// all-or-nothing).
849    #[test]
850    fn from_settings_then_env_partial_env_keeps_toml_for_untouched_field() {
851        let _lock =
852            ENV_LOCK.lock().unwrap_or_else(|p| p.into_inner());
853        let _g = set_named_env(ENV_CLUSTER_MIN_SIZE, "9");
854        let cfg = StewardConfig::from_settings_then_env(None, Some(0.42))
855            .expect("env on min_size, TOML on threshold");
856        assert_eq!(cfg.cluster_min_size, 9, "env override applied");
857        assert!(
858            (cfg.cluster_cosine_threshold - 0.42).abs() < 1e-6,
859            "env unset for threshold → TOML 0.42 survives"
860        );
861    }
862
863    /// Invalid TOML values surface a typed error rather than silently
864    /// falling back. Mirrors the env-var parser's strictness so
865    /// operators see the same diagnostic regardless of which surface
866    /// they used.
867    #[test]
868    fn from_settings_then_env_rejects_bad_toml_threshold() {
869        let _lock =
870            ENV_LOCK.lock().unwrap_or_else(|p| p.into_inner());
871        let _g = clear_env();
872        // > 1.0 is out of range.
873        let err =
874            StewardConfig::from_settings_then_env(None, Some(1.5)).unwrap_err();
875        assert!(matches!(err, Error::InvalidInput(_)));
876        // Zero is out of range (must be > 0.0).
877        let err =
878            StewardConfig::from_settings_then_env(None, Some(0.0)).unwrap_err();
879        assert!(matches!(err, Error::InvalidInput(_)));
880        // NaN.
881        let err =
882            StewardConfig::from_settings_then_env(None, Some(f32::NAN)).unwrap_err();
883        assert!(matches!(err, Error::InvalidInput(_)));
884        // cluster_min_size = 0 is rejected.
885        let err =
886            StewardConfig::from_settings_then_env(Some(0), None).unwrap_err();
887        assert!(matches!(err, Error::InvalidInput(_)));
888    }
889}
890
891#[cfg(test)]
892mod has_llm_tests {
893    use super::*;
894    use crate::test_support::StubLlmClient;
895
896    /// A `Steward` wrapping a default stub reports `has_llm() ==
897    /// false` — the writer's contradiction sweep gates on this and
898    /// must skip when only a stub is configured.
899    #[test]
900    fn has_llm_false_for_default_stub() {
901        let stub = Arc::new(StubLlmClient::default_stub());
902        let s = Steward::new(stub, StewardConfig::default());
903        assert!(!s.has_llm());
904    }
905
906    /// A stub whose `pretend_real_llm(true)` flag is set reports
907    /// `has_llm() == true`. Used by storage-layer integration tests
908    /// that need the contradiction-sweep code path to run against
909    /// canned responses.
910    #[test]
911    fn has_llm_true_when_stub_pretends() {
912        let stub = Arc::new(
913            StubLlmClient::default_stub().pretend_real_llm(true),
914        );
915        let s = Steward::new(stub, StewardConfig::default());
916        assert!(s.has_llm());
917    }
918}
919
920// ----------------------------------------------------------------
921// v0.10.1 (P4 audit m5): extract_triples_batch per-cluster timeout
922// ----------------------------------------------------------------
923#[cfg(test)]
924mod extract_triples_batch_timeout_tests {
925    use super::*;
926    use crate::test_support::StubLlmClient;
927    use async_trait::async_trait;
928    use solo_core::{
929        Confidence, Embedding, EmbeddingDtype, Episode, MemoryId, Message, Role, Tier,
930    };
931    use std::sync::Arc;
932    use std::sync::atomic::{AtomicUsize, Ordering};
933    use std::time::Duration;
934
935    /// A custom LlmClient stub that holds its `complete()` future for a
936    /// configurable per-call duration before returning a parseable
937    /// abstraction response. Used to force `tokio::time::timeout` to
938    /// fire inside `extract_triples_batch`.
939    ///
940    /// Call N (0-indexed): sleeps `delays[N.min(delays.len()-1)]` ms.
941    /// The last entry is reused for every subsequent call (so a single
942    /// `vec![5_000]` makes EVERY call slow).
943    struct SlowStub {
944        delays_ms: Vec<u64>,
945        calls: AtomicUsize,
946    }
947
948    impl SlowStub {
949        fn new(delays_ms: Vec<u64>) -> Self {
950            Self {
951                delays_ms,
952                calls: AtomicUsize::new(0),
953            }
954        }
955        fn call_count(&self) -> usize {
956            self.calls.load(Ordering::Relaxed)
957        }
958    }
959
960    #[async_trait]
961    impl LlmClient for SlowStub {
962        fn name(&self) -> &str {
963            "slow-stub"
964        }
965        async fn complete(&self, _messages: &[Message]) -> Result<Message> {
966            let idx = self.calls.fetch_add(1, Ordering::Relaxed);
967            let delay = self
968                .delays_ms
969                .get(idx)
970                .or_else(|| self.delays_ms.last())
971                .copied()
972                .unwrap_or(0);
973            tokio::time::sleep(Duration::from_millis(delay)).await;
974            Ok(Message {
975                role: Role::Assistant,
976                content: r#"{"content":"ok","confidence":0.7,"triples":[]}"#
977                    .into(),
978            })
979        }
980        fn is_real_llm(&self) -> bool {
981            true
982        }
983    }
984
985    /// Build a minimal cluster + one episode so `abstract_cluster`
986    /// has shapeful input. The contents don't matter — the SlowStub
987    /// ignores the prompt.
988    fn mk_cluster_with_episode() -> (Cluster, Vec<Episode>) {
989        let cluster_id = MemoryId::new();
990        let memory_id = MemoryId::new();
991        let episode = Episode {
992            memory_id,
993            ts_ms: 0,
994            source_type: "user_message".into(),
995            content: "hello".into(),
996            encoding_context: Default::default(),
997            provenance: None,
998            confidence: Confidence::new(0.9).unwrap(),
999            strength: 0.5,
1000            salience: 0.5,
1001            tier: Tier::Hot,
1002            source_id: None,
1003        };
1004        let centroid = Embedding {
1005            data: bytemuck::cast_slice(&[1.0f32, 0.0, 0.0, 0.0]).to_vec(),
1006            dim: 4,
1007            dtype: EmbeddingDtype::F32,
1008        };
1009        let cluster = Cluster {
1010            cluster_id,
1011            episode_ids: vec![memory_id],
1012            centroid: Some(centroid),
1013            coherence: 0.95,
1014        };
1015        (cluster, vec![episode])
1016    }
1017
1018    fn rt() -> tokio::runtime::Runtime {
1019        tokio::runtime::Builder::new_multi_thread()
1020            .worker_threads(2)
1021            .enable_all()
1022            .build()
1023            .unwrap()
1024    }
1025
1026    /// m5 test 1 (the main pin): when one cluster's
1027    /// `abstract_cluster` hangs past the per-cluster timeout, the
1028    /// batch CONTINUES with the next cluster. The hung cluster is
1029    /// counted as deferred — not lumped in with errors.
1030    #[test]
1031    fn extract_triples_batch_continues_after_cluster_timeout() {
1032        let rt = rt();
1033        rt.block_on(async {
1034            // Cluster 1: fast (50ms). Cluster 2: hangs (3s — well past
1035            // the 200ms timeout). Cluster 3: fast (50ms).
1036            let stub = Arc::new(SlowStub::new(vec![50, 3_000, 50]));
1037            let steward = Steward::new(stub.clone(), StewardConfig::default());
1038
1039            let inputs = vec![
1040                mk_cluster_with_episode(),
1041                mk_cluster_with_episode(),
1042                mk_cluster_with_episode(),
1043            ];
1044
1045            let outcome = steward
1046                .extract_triples_batch(inputs, Duration::from_millis(200))
1047                .await;
1048
1049            assert_eq!(
1050                outcome.abstractions.len(),
1051                2,
1052                "clusters 1 + 3 must succeed while cluster 2 times out"
1053            );
1054            assert_eq!(
1055                outcome.deferred_count, 1,
1056                "cluster 2 must be counted as deferred (timeout), not failed"
1057            );
1058            // Verify all three calls were attempted (the loop didn't
1059            // bail out after the timeout).
1060            assert_eq!(
1061                stub.call_count(),
1062                3,
1063                "every cluster's abstract_cluster must be attempted; \
1064                 a timeout on one MUST NOT abort the batch"
1065            );
1066        });
1067    }
1068
1069    /// m5 test 2: happy path — all clusters complete in time → zero
1070    /// deferred. Pins that the timeout path doesn't introduce
1071    /// spurious deferrals.
1072    #[test]
1073    fn extract_triples_batch_returns_succeeded_count() {
1074        let rt = rt();
1075        rt.block_on(async {
1076            let stub = Arc::new(SlowStub::new(vec![10]));
1077            let steward = Steward::new(stub, StewardConfig::default());
1078
1079            let inputs = vec![
1080                mk_cluster_with_episode(),
1081                mk_cluster_with_episode(),
1082                mk_cluster_with_episode(),
1083            ];
1084
1085            let outcome = steward
1086                .extract_triples_batch(inputs, Duration::from_secs(5))
1087                .await;
1088
1089            assert_eq!(outcome.abstractions.len(), 3);
1090            assert_eq!(
1091                outcome.deferred_count, 0,
1092                "no clusters exceeded the timeout"
1093            );
1094        });
1095    }
1096
1097    /// m5 test 3: clusters that return Err (NOT timeouts) are
1098    /// distinct from deferred. The deferred counter is for timeout
1099    /// hangs specifically; a per-cluster Err is not counted toward
1100    /// `deferred_count` (it's the implicit `cluster_count -
1101    /// abstractions_built - deferred_count` half).
1102    ///
1103    /// We use a real `StubLlmClient` with a canned response that
1104    /// fails to parse — `abstract_cluster` returns an Err. The
1105    /// `SlowStub` doesn't have a "fail" mode without complicating it.
1106    #[test]
1107    fn extract_triples_batch_failure_distinct_from_timeout() {
1108        let rt = rt();
1109        rt.block_on(async {
1110            // Stub returns a non-JSON response → abstraction parse
1111            // fails → abstract_cluster returns Err. Note we still need
1112            // `pretend_real_llm(true)` so `has_llm()` returns true.
1113            let stub = Arc::new(
1114                StubLlmClient::with_canned("garbage-stub", "NOT-JSON")
1115                    .pretend_real_llm(true),
1116            );
1117            let steward = Steward::new(stub, StewardConfig::default());
1118
1119            let inputs = vec![mk_cluster_with_episode()];
1120            let outcome = steward
1121                .extract_triples_batch(inputs, Duration::from_secs(5))
1122                .await;
1123
1124            assert_eq!(
1125                outcome.abstractions.len(),
1126                0,
1127                "the parse failure means no abstraction lands"
1128            );
1129            assert_eq!(
1130                outcome.deferred_count, 0,
1131                "an Err return is NOT a deferral — only timeout-elapsed \
1132                 counts as deferred. failed clusters are computed by the \
1133                 caller as cluster_count - abstractions_built - deferred"
1134            );
1135        });
1136    }
1137
1138    /// m5 test 4: zero-Duration disables the timeout. Verifies the
1139    /// operator escape hatch (`[triples] cluster_timeout_secs = 0`
1140    /// → `Duration::ZERO`). The slow cluster STILL completes
1141    /// because we never wrapped it.
1142    #[test]
1143    fn extract_triples_batch_zero_timeout_disables_per_cluster_timeout() {
1144        let rt = rt();
1145        rt.block_on(async {
1146            // 100ms delay; tiny but enough to ensure tokio::timeout
1147            // (if active) wouldn't fire on a tight bound.
1148            let stub = Arc::new(SlowStub::new(vec![100]));
1149            let steward = Steward::new(stub, StewardConfig::default());
1150            let inputs = vec![mk_cluster_with_episode()];
1151            let outcome = steward
1152                .extract_triples_batch(inputs, Duration::ZERO)
1153                .await;
1154            assert_eq!(
1155                outcome.abstractions.len(),
1156                1,
1157                "zero Duration disables the timeout; the cluster must \
1158                 succeed even with a delay"
1159            );
1160            assert_eq!(outcome.deferred_count, 0);
1161        });
1162    }
1163
1164    /// m5 test 5: when the steward has no LLM (`has_llm() == false`),
1165    /// the batch short-circuits with an empty outcome regardless of
1166    /// the per-cluster timeout. Pins the fast path.
1167    #[test]
1168    fn extract_triples_batch_no_llm_returns_empty_outcome() {
1169        let rt = rt();
1170        rt.block_on(async {
1171            // default stub: is_real_llm == false → has_llm == false.
1172            let stub = Arc::new(StubLlmClient::default_stub());
1173            let steward = Steward::new(stub, StewardConfig::default());
1174            let inputs = vec![mk_cluster_with_episode()];
1175            let outcome = steward
1176                .extract_triples_batch(inputs, Duration::from_secs(5))
1177                .await;
1178            assert!(outcome.abstractions.is_empty());
1179            assert_eq!(outcome.deferred_count, 0);
1180        });
1181    }
1182
1183    /// m5 test 6: the passed Duration is actually wired through.
1184    /// Distinguished from test 1 (which already pins the deferral)
1185    /// by using a generous timeout that should comfortably swallow
1186    /// the call — if the wiring is broken (e.g. we accidentally
1187    /// hardcoded a different Duration), the slow call would deferr
1188    /// inappropriately.
1189    ///
1190    /// Pin both directions: 50ms call with 1s timeout succeeds; the
1191    /// SAME 50ms call with a 10ms timeout defers.
1192    #[test]
1193    fn extract_triples_batch_uses_passed_timeout() {
1194        let rt = rt();
1195        rt.block_on(async {
1196            // 50ms call, generous 1s timeout → success.
1197            let stub = Arc::new(SlowStub::new(vec![50]));
1198            let steward = Steward::new(stub, StewardConfig::default());
1199            let outcome = steward
1200                .extract_triples_batch(
1201                    vec![mk_cluster_with_episode()],
1202                    Duration::from_secs(1),
1203                )
1204                .await;
1205            assert_eq!(outcome.abstractions.len(), 1);
1206            assert_eq!(outcome.deferred_count, 0);
1207
1208            // SAME 50ms call shape, tight 10ms timeout → deferral.
1209            let stub2 = Arc::new(SlowStub::new(vec![50]));
1210            let steward2 = Steward::new(stub2, StewardConfig::default());
1211            let outcome2 = steward2
1212                .extract_triples_batch(
1213                    vec![mk_cluster_with_episode()],
1214                    Duration::from_millis(10),
1215                )
1216                .await;
1217            assert_eq!(outcome2.abstractions.len(), 0);
1218            assert_eq!(outcome2.deferred_count, 1);
1219        });
1220    }
1221}