solo_steward/lib.rs
1// SPDX-License-Identifier: Apache-2.0
2
3//! Solo steward: consolidation pass (SWS-equivalent dedup, REM-equivalent
4//! integration, decay sweep, reconsolidation).
5//!
6//! Per ADR-0002, `Steward` is a struct, not a trait. The LLM backend is
7//! the swap point (via `solo_core::LlmClient`); the consolidation logic
8//! lives in one place.
9//!
10//! ## v0.2 status
11//!
12//! - **`cluster_episodes`** (this commit): pure-deterministic SWS
13//! pass. See [`cluster`] for the algorithm + tests. Implementation
14//! does not call the LLM and does not touch the DB — caller pairs
15//! `(Episode, Embedding)` from SQL upstream.
16//! - **`abstract_cluster`**: TODO — REM-equivalent LLM call to
17//! produce a [`SemanticAbstraction`] for each cluster. Will go in
18//! `crate::abstraction` when added.
19//! - **`detect_contradiction`**: TODO — bi-temporal SQL filter +
20//! LLM judge for ambiguous cases. Will go in `crate::contradiction`.
21//!
22//! Storage wiring (writer → cluster → SQL persistence) lands in
23//! `solo-storage` separately.
24
25use std::sync::Arc;
26use std::time::Duration;
27
28use solo_core::{
29 Cluster, Contradiction, Embedding, Episode, Error, LlmClient, Result, SemanticAbstraction,
30 Triple,
31};
32
33/// Env var name for tuning the centroid-cosine threshold used by
34/// every clustering / existing-merge / merge-candidate count site.
35/// One knob governs them all to preserve the SQL-sync invariant
36/// between the writer's existing-merge and the doctor's count
37/// (see `solo_storage::merge_candidates` module docstring).
38pub const ENV_CLUSTER_COSINE_THRESHOLD: &str =
39 "SOLO_CLUSTER_COSINE_THRESHOLD";
40
41/// Env var name for tuning the minimum cluster size (number of
42/// episodes) below which a candidate cluster is discarded by the
43/// SWS-equivalent clustering pass. Must be a positive integer.
44/// Default 3.
45pub const ENV_CLUSTER_MIN_SIZE: &str = "SOLO_CLUSTER_MIN_SIZE";
46
47/// Env var name for tuning the abstraction LLM call's
48/// `max_tokens` request. Must be a positive integer in
49/// `[1, 65_536]` (upper bound catches typo'd values like
50/// `1_000_000` that would silently inflate per-call cost).
51/// Default 512.
52pub const ENV_ABSTRACTION_MAX_TOKENS: &str =
53 "SOLO_ABSTRACTION_MAX_TOKENS";
54
55/// Env var name for the contradiction-detection toggle. `true` or
56/// `false`, case-insensitive. When `false`, the consolidation pass
57/// skips the rule filter + LLM judge entirely and
58/// `contradictions_found` stays 0. Useful for cost-constrained
59/// operators willing to lose contradiction tracking. Default
60/// `true`.
61pub const ENV_CONTRADICTION_CHECK_ENABLED: &str =
62 "SOLO_CONTRADICTION_CHECK_ENABLED";
63
64pub mod abstraction;
65pub mod cluster;
66pub mod contradiction;
67
68#[cfg(any(test, feature = "test-support"))]
69pub mod test_support;
70
71pub struct Steward {
72 client: Arc<dyn LlmClient>,
73 config: StewardConfig,
74}
75
76#[derive(Debug, Clone)]
77pub struct StewardConfig {
78 pub cluster_min_size: usize,
79 pub cluster_cosine_threshold: f32,
80 pub abstraction_max_tokens: usize,
81 pub contradiction_check_enabled: bool,
82}
83
84impl Default for StewardConfig {
85 fn default() -> Self {
86 Self {
87 // Lowered from 3 → 2 to let small-corpus operators see
88 // clustering kick in on ≤30 episodes. The original 3 is
89 // appropriate at scale but creates a "nothing happens for
90 // weeks" UX for a personal-Jarvis user with sparse memory.
91 cluster_min_size: 2,
92 // Lowered from 0.85 → 0.55 for the bundled `all-MiniLM-L6-v2`
93 // embedder, which produces conservative similarity scores
94 // (topically-related pairs land in the 0.45-0.65 band rather
95 // than 0.80+). The 0.85 default was tuned for a stronger
96 // embedder; on the bundled model it catches near-paraphrases
97 // only and starves the consolidate pass of clusters.
98 //
99 // v0.11.1: both `cluster_min_size` and `cluster_cosine_threshold`
100 // are also exposed via `[steward]` in `solo.config.toml` (see
101 // `solo_storage::StewardSettings`). Operators who want
102 // different values from these baked-in defaults set them
103 // there (or via the long-standing `SOLO_CLUSTER_*` env vars,
104 // which still win); these defaults are now the floor when
105 // neither TOML nor env touches the knob.
106 cluster_cosine_threshold: 0.55,
107 abstraction_max_tokens: 512,
108 contradiction_check_enabled: true,
109 }
110 }
111}
112
113/// v0.10.1 (P4 audit m5): outcome of [`Steward::extract_triples_batch`].
114///
115/// Carries both the per-cluster successes (each abstraction is keyed by
116/// its cluster's `MemoryId`) AND the count of clusters that timed out
117/// during their per-cluster LLM call. The deferred count surfaces in
118/// the writer-actor's `MemoryTriplesExtract` audit row as
119/// `details_json.clusters_deferred`, giving operators an explicit
120/// "something is slow" signal in the audit log without having to grep
121/// `tracing::warn!` lines.
122///
123/// Pure-Rust failures (LLM returns an error result, not a timeout) are
124/// LOGGED but NOT counted in `deferred_count`. They're the implicit
125/// `clusters_failed` half of the audit row, computed by the writer as
126/// `cluster_count - abstractions_built - clusters_deferred`. The split
127/// is operator-actionable: deferred → slow backend, failed → backend
128/// rejected the prompt.
129#[derive(Debug, Default, Clone)]
130pub struct ExtractTriplesBatchOutcome {
131 /// One entry per cluster whose `abstract_cluster` call succeeded.
132 /// Iteration order matches the input order; the daemon-side
133 /// `AttachAbstractionBatch` handler persists these in a single
134 /// outer tx (with per-cluster `SAVEPOINT`s, per the v0.9.0 P4-
135 /// revision M2 hardening).
136 pub abstractions: Vec<(solo_core::MemoryId, SemanticAbstraction)>,
137 /// Number of clusters whose `abstract_cluster` call exceeded
138 /// `per_cluster_timeout`. Not present in `abstractions`; the
139 /// cluster's lack of a `semantic_abstractions` row will re-select
140 /// it on the next batch tick.
141 pub deferred_count: usize,
142}
143
144/// Read an env var, trimming whitespace. Returns `Some(value)` only
145/// if the var is set AND the trimmed value is non-empty. Treating
146/// empty strings as "unset" guards against shells where setting a
147/// var to nothing means "leave it default" rather than "force the
148/// empty value through the parser".
149fn env_trimmed(name: &str) -> Option<String> {
150 std::env::var(name).ok().and_then(|s| {
151 let t = s.trim();
152 if t.is_empty() {
153 None
154 } else {
155 Some(t.to_string())
156 }
157 })
158}
159
160impl StewardConfig {
161 /// Build a config from process env vars, falling back to
162 /// [`StewardConfig::default`] for unset / empty fields.
163 ///
164 /// Honoured env vars (all optional; unset → default):
165 ///
166 /// - [`ENV_CLUSTER_COSINE_THRESHOLD`] — finite f32 in
167 /// `(0.0, 1.0]`. Default `0.85`. Affects writer's clustering
168 /// pass, in-run merge, existing-vs-existing merge, and
169 /// doctor's merge-candidate count uniformly (SQL+threshold-
170 /// sync invariant — see `solo_storage::merge_candidates`).
171 /// - [`ENV_CLUSTER_MIN_SIZE`] — positive integer. Default `3`.
172 /// Clusters with fewer episodes than this are discarded by
173 /// the SWS-equivalent clustering pass.
174 /// - [`ENV_ABSTRACTION_MAX_TOKENS`] — positive integer in
175 /// `[1, 65_536]`. Default `512`. The `max_tokens` value the
176 /// abstraction LLM call sends. Upper bound catches typo'd
177 /// values that would silently inflate per-call cost.
178 /// - [`ENV_CONTRADICTION_CHECK_ENABLED`] — `true` or `false`,
179 /// case-insensitive. Default `true`. When `false`, skips
180 /// the contradiction rule filter + LLM judge entirely.
181 ///
182 /// Returns `Err(Error::InvalidInput)` if any env var is set but
183 /// can't be parsed or is out of range. Operators would rather
184 /// see early failure than silently fall back to defaults (which
185 /// would mask a typo'd value). All-or-nothing: if any field
186 /// fails validation, the whole call errors and the prior fields'
187 /// successful parses don't take effect — daemon/consolidate
188 /// startup fails, operator fixes the env, retries.
189 pub fn from_env() -> Result<Self> {
190 let mut cfg = Self::default();
191
192 if let Some(raw) = env_trimmed(ENV_CLUSTER_COSINE_THRESHOLD) {
193 let parsed: f32 = raw.parse().map_err(|_| {
194 Error::invalid_input(format!(
195 "{ENV_CLUSTER_COSINE_THRESHOLD}: not a valid f32 ({raw:?})"
196 ))
197 })?;
198 if !parsed.is_finite() || parsed <= 0.0 || parsed > 1.0 {
199 return Err(Error::invalid_input(format!(
200 "{ENV_CLUSTER_COSINE_THRESHOLD}: must be a finite f32 in (0.0, 1.0], got {parsed}"
201 )));
202 }
203 cfg.cluster_cosine_threshold = parsed;
204 }
205
206 if let Some(raw) = env_trimmed(ENV_CLUSTER_MIN_SIZE) {
207 let parsed: usize = raw.parse().map_err(|_| {
208 Error::invalid_input(format!(
209 "{ENV_CLUSTER_MIN_SIZE}: not a valid non-negative integer ({raw:?})"
210 ))
211 })?;
212 if parsed < 1 {
213 return Err(Error::invalid_input(format!(
214 "{ENV_CLUSTER_MIN_SIZE}: must be >= 1, got {parsed}"
215 )));
216 }
217 cfg.cluster_min_size = parsed;
218 }
219
220 if let Some(raw) = env_trimmed(ENV_ABSTRACTION_MAX_TOKENS) {
221 let parsed: usize = raw.parse().map_err(|_| {
222 Error::invalid_input(format!(
223 "{ENV_ABSTRACTION_MAX_TOKENS}: not a valid non-negative integer ({raw:?})"
224 ))
225 })?;
226 if !(1..=65_536).contains(&parsed) {
227 return Err(Error::invalid_input(format!(
228 "{ENV_ABSTRACTION_MAX_TOKENS}: must be in [1, 65536], got {parsed}"
229 )));
230 }
231 cfg.abstraction_max_tokens = parsed;
232 }
233
234 if let Some(raw) = env_trimmed(ENV_CONTRADICTION_CHECK_ENABLED) {
235 let parsed = match raw.to_ascii_lowercase().as_str() {
236 "true" => true,
237 "false" => false,
238 _ => {
239 return Err(Error::invalid_input(format!(
240 "{ENV_CONTRADICTION_CHECK_ENABLED}: must be \"true\" or \"false\" (case-insensitive), got {raw:?}"
241 )));
242 }
243 };
244 cfg.contradiction_check_enabled = parsed;
245 }
246
247 Ok(cfg)
248 }
249
250 /// v0.11.1: Build a config by layering TOML overrides under env-var
251 /// overrides, both on top of [`StewardConfig::default`]. Used by the
252 /// daemon and one-shot CLI paths so an operator can pin
253 /// `cluster_min_size` + `cluster_cosine_threshold` in
254 /// `solo.config.toml` and still let `SOLO_CLUSTER_*` env vars win
255 /// per-runtime (the v0.10.x escape-hatch contract).
256 ///
257 /// Resolution order (lowest precedence first):
258 /// 1. [`StewardConfig::default`]
259 /// 2. TOML `[steward]` block — `Some(_)` values from the supplied
260 /// `toml_min_size` / `toml_cosine_threshold`
261 /// 3. Env vars — `SOLO_CLUSTER_MIN_SIZE` / `SOLO_CLUSTER_COSINE_THRESHOLD`
262 /// via [`Self::from_env`]
263 ///
264 /// The two TOML values are passed by value rather than wrapped in a
265 /// dedicated struct so `solo-steward` doesn't depend on
266 /// `solo-storage::config::StewardSettings` (which would be circular
267 /// — `solo-storage` already deps on `solo-steward`). The
268 /// `solo-storage` side owns the TOML schema; this constructor's
269 /// signature is the carry-the-Option boundary.
270 ///
271 /// Validation: both TOML overrides are validated to the same bounds
272 /// as the env vars (`cluster_min_size >= 1`,
273 /// `cluster_cosine_threshold` finite in `(0.0, 1.0]`). A bad TOML
274 /// value surfaces as `Error::InvalidInput` with the bound and the
275 /// offending value, matching the env-var error messages so the
276 /// operator sees the same diagnostic regardless of which surface
277 /// they used.
278 pub fn from_settings_then_env(
279 toml_min_size: Option<usize>,
280 toml_cosine_threshold: Option<f32>,
281 ) -> Result<Self> {
282 let mut cfg = Self::default();
283
284 if let Some(parsed) = toml_min_size {
285 if parsed < 1 {
286 return Err(Error::invalid_input(format!(
287 "[steward] cluster_min_size: must be >= 1, got {parsed}"
288 )));
289 }
290 cfg.cluster_min_size = parsed;
291 }
292
293 if let Some(parsed) = toml_cosine_threshold {
294 if !parsed.is_finite() || parsed <= 0.0 || parsed > 1.0 {
295 return Err(Error::invalid_input(format!(
296 "[steward] cluster_cosine_threshold: must be a finite f32 in (0.0, 1.0], got {parsed}"
297 )));
298 }
299 cfg.cluster_cosine_threshold = parsed;
300 }
301
302 // Layer env vars on top. We re-run the env parser (rather than
303 // skip-to-fields) so `abstraction_max_tokens` /
304 // `contradiction_check_enabled` continue to read from env — the
305 // TOML surface only owns the clustering pair.
306 let env_cfg = Self::from_env()?;
307 // Preserve TOML cluster values UNLESS the env actually set
308 // something. `from_env()` starts from `default()` and only
309 // mutates fields whose env var was set, so equality-with-default
310 // is the "env didn't touch this" signal.
311 let env_default = Self::default();
312 if (env_cfg.cluster_min_size, env_cfg.cluster_cosine_threshold)
313 != (env_default.cluster_min_size, env_default.cluster_cosine_threshold)
314 {
315 // At least one env var fired; take its values. The
316 // `from_env` parser left untouched fields at default —
317 // those are the ones where TOML should still rule.
318 if env_cfg.cluster_min_size != env_default.cluster_min_size {
319 cfg.cluster_min_size = env_cfg.cluster_min_size;
320 }
321 if env_cfg.cluster_cosine_threshold != env_default.cluster_cosine_threshold {
322 cfg.cluster_cosine_threshold = env_cfg.cluster_cosine_threshold;
323 }
324 }
325 // The non-cluster env-driven fields always take effect — TOML
326 // doesn't carry these, so `env_cfg`'s values (which equal default
327 // when env was unset) are the right source of truth.
328 cfg.abstraction_max_tokens = env_cfg.abstraction_max_tokens;
329 cfg.contradiction_check_enabled = env_cfg.contradiction_check_enabled;
330
331 Ok(cfg)
332 }
333}
334
335impl Steward {
336 pub fn new(client: Arc<dyn LlmClient>, config: StewardConfig) -> Self {
337 Self { client, config }
338 }
339
340 /// Borrow the configured thresholds. Useful for callers that want
341 /// to call `cluster::cluster_episodes` directly (e.g. tests, or
342 /// future fine-grained pipelines).
343 pub fn config(&self) -> &StewardConfig {
344 &self.config
345 }
346
347 /// True iff this `Steward`'s LLM client talks to a real backend
348 /// (Anthropic, OpenAI, Ollama, future candle/local). False for
349 /// stub clients used in tests.
350 ///
351 /// Callers gate LLM-dependent work on this so the system stays
352 /// usable in stub-only configurations: e.g. the writer's
353 /// contradiction sweep early-returns when this is `false`,
354 /// because a stub's canned response can't faithfully arbitrate
355 /// triples it has never seen. The cheap clustering pass and the
356 /// abstraction pass remain reachable through their own gates.
357 ///
358 /// Delegates to [`solo_core::LlmClient::is_real_llm`], the per-
359 /// backend distinction.
360 pub fn has_llm(&self) -> bool {
361 self.client.is_real_llm()
362 }
363
364 /// SWS-equivalent: cluster the supplied `(Episode, Embedding)`
365 /// pairs by UTC day + cosine threshold. Pure-deterministic; does
366 /// not call the LLM. See [`cluster::cluster_episodes`] for the
367 /// algorithm.
368 pub async fn cluster_episodes(
369 &self,
370 inputs: &[(Episode, Embedding)],
371 ) -> Result<Vec<Cluster>> {
372 // Trivial wrapper today; preserves the async signature for
373 // when the algorithm grows entity-aware bucketing that may
374 // need to fan out (e.g. small per-entity LLM checks).
375 cluster::cluster_episodes(inputs, &self.config)
376 }
377
378 /// REM-equivalent: ask the LLM to produce a semantic abstraction
379 /// of a cluster. Provenance is preserved — `derived_from`
380 /// references every source episode.
381 ///
382 /// `episodes` must contain (at minimum) every `MemoryId` in
383 /// `cluster.episode_ids`. The Steward intentionally does not
384 /// hold a DB handle (per ADR-0002 "Steward depends only on
385 /// solo-core"), so the caller pairs cluster + episodes upstream.
386 pub async fn abstract_cluster(
387 &self,
388 cluster: &Cluster,
389 episodes: &[Episode],
390 ) -> Result<SemanticAbstraction> {
391 abstraction::abstract_cluster(cluster, episodes, self.client.as_ref()).await
392 }
393
394 /// v0.9.0 P4c: batch entry-point for the daemon-side consolidate
395 /// timer. For each `(cluster, episodes)` pair, calls
396 /// [`Self::abstract_cluster`] and accumulates the resulting
397 /// `SemanticAbstraction` (with its embedded `triples`) into the
398 /// returned [`ExtractTriplesBatchOutcome::abstractions`].
399 ///
400 /// Per-cluster failures are LOGGED but do NOT abort the batch:
401 /// the daemon-side caller persists every successful abstraction
402 /// in a single `WriteCommand::AttachAbstractionBatch` tx. A
403 /// half-failed batch (some clusters succeeded, some failed) is
404 /// the documented behavior — the failed clusters retry on the
405 /// next tick. This matches the v0.8.x writer-actor's
406 /// per-cluster log-and-skip discipline; we just moved the loop.
407 ///
408 /// v0.10.1 (P4 audit m5): each per-cluster `abstract_cluster`
409 /// call is wrapped in [`tokio::time::timeout`] with the supplied
410 /// `per_cluster_timeout`. A hung LLM call (MCP peer that never
411 /// responds, slow Ollama model, network stall on Anthropic) no
412 /// longer blocks the next cluster — the timeout fires, the
413 /// cluster is counted as "deferred", the batch continues. The
414 /// deferred cluster's lack of a `semantic_abstractions` row means
415 /// the NEXT batch tick will pick it up again automatically (the
416 /// reader pool's `fetch_clusters_without_abstractions` query
417 /// selects on the same predicate).
418 ///
419 /// A `per_cluster_timeout` of `Duration::ZERO` DISABLES the
420 /// per-cluster timeout — every call runs to natural completion.
421 /// Symmetric with the `[triples] cluster_timeout_secs = 0`
422 /// operator escape hatch in `solo.config.toml`. Useful for
423 /// operators on very slow local backends; not recommended in
424 /// production (a single hung peer can stall the batch).
425 ///
426 /// Cancellation note (v0.10.1): when the timeout fires,
427 /// `tokio::time::timeout` drops the inner `abstract_cluster`
428 /// future. If the underlying LLM transport doesn't honor
429 /// cancellation (some HTTP clients with in-flight requests may
430 /// not), the actual RPC may complete in the background. The
431 /// response is discarded — no double-write — but the LLM-side
432 /// work is wasted. Acceptable for v0.10.1.
433 ///
434 /// Output shape: [`ExtractTriplesBatchOutcome`] carries the
435 /// abstractions vec + a `deferred_count` for the audit row. The
436 /// caller persists `semantic_abstractions(content, provenance,
437 /// confidence)` + N `triples` rows per entry, and threads
438 /// `deferred_count` into the `MemoryTriplesExtract` audit row's
439 /// `details_json.clusters_deferred`.
440 ///
441 /// `Steward::has_llm() == false` is a NO-OP fast path: returns
442 /// an outcome with empty abstractions + zero deferred. Mirrors
443 /// the v0.8.x writer-actor's "if no LLM wired, skip abstraction"
444 /// gate.
445 pub async fn extract_triples_batch(
446 &self,
447 clusters_with_episodes: Vec<(Cluster, Vec<Episode>)>,
448 per_cluster_timeout: Duration,
449 ) -> ExtractTriplesBatchOutcome {
450 if !self.has_llm() {
451 return ExtractTriplesBatchOutcome::default();
452 }
453 let mut abstractions: Vec<(solo_core::MemoryId, SemanticAbstraction)> =
454 Vec::with_capacity(clusters_with_episodes.len());
455 let mut deferred_count: usize = 0;
456 let timeout_enabled = !per_cluster_timeout.is_zero();
457 for (cluster, episodes) in clusters_with_episodes {
458 let cluster_id = cluster.cluster_id;
459 let call = self.abstract_cluster(&cluster, &episodes);
460 let result = if timeout_enabled {
461 match tokio::time::timeout(per_cluster_timeout, call).await {
462 Ok(inner) => inner,
463 Err(_elapsed) => {
464 tracing::warn!(
465 cluster_id = %cluster_id,
466 timeout_secs = per_cluster_timeout.as_secs(),
467 "v0.10.1 m5 extract_triples_batch: abstract_cluster \
468 timed out; deferring cluster to next batch tick \
469 (the cluster's lack of a semantic_abstractions \
470 row will re-select it on the next pass)"
471 );
472 deferred_count += 1;
473 continue;
474 }
475 }
476 } else {
477 call.await
478 };
479 match result {
480 Ok(abs) => {
481 abstractions.push((cluster_id, abs));
482 }
483 Err(e) => {
484 tracing::warn!(
485 cluster_id = %cluster_id,
486 error = %e,
487 "v0.9.0 P4c extract_triples_batch: abstract_cluster \
488 failed; cluster persists, abstraction retries on \
489 next tick"
490 );
491 }
492 }
493 }
494 ExtractTriplesBatchOutcome {
495 abstractions,
496 deferred_count,
497 }
498 }
499
500 /// Surface contradictions for the consolidation pass to flag
501 /// for resolution. Two-stage: cheap pure-Rust rule filter
502 /// (subject + predicate + temporal overlap) + LLM judge for
503 /// pairs that survive. See [`contradiction::detect_contradiction`]
504 /// for the algorithm + tests.
505 pub async fn detect_contradiction(
506 &self,
507 a: &Triple,
508 b: &Triple,
509 ) -> Result<Option<Contradiction>> {
510 contradiction::detect_contradiction(a, b, self.client.as_ref()).await
511 }
512}
513
514#[cfg(test)]
515mod from_env_tests {
516 use super::*;
517 use std::sync::Mutex;
518
519 // Env vars are process-global mutable state — serialize the tests
520 // here so cargo test's per-binary parallel runner doesn't race
521 // SOLO_CLUSTER_COSINE_THRESHOLD between cases. `unwrap_or_else
522 // (PoisonError::into_inner)` keeps a panicking test from poisoning
523 // the lock for the rest of the suite.
524 static ENV_LOCK: Mutex<()> = Mutex::new(());
525
526 /// Drops every steward env var on Drop so a panicking test doesn't
527 /// leak state into the next case (which would expect either "unset"
528 /// or its own set value). Cleanup list MUST stay in sync with the
529 /// const list at the top of the module.
530 struct EnvGuard;
531 impl Drop for EnvGuard {
532 fn drop(&mut self) {
533 // SAFETY: ENV_LOCK is held by the caller, so no other
534 // thread is concurrently reading or writing these vars.
535 for k in [
536 ENV_CLUSTER_COSINE_THRESHOLD,
537 ENV_CLUSTER_MIN_SIZE,
538 ENV_ABSTRACTION_MAX_TOKENS,
539 ENV_CONTRADICTION_CHECK_ENABLED,
540 ] {
541 unsafe { std::env::remove_var(k) };
542 }
543 }
544 }
545
546 /// Set ENV_CLUSTER_COSINE_THRESHOLD specifically. The other env
547 /// vars have their own helpers below.
548 fn set_env(value: &str) -> EnvGuard {
549 set_named_env(ENV_CLUSTER_COSINE_THRESHOLD, value)
550 }
551
552 fn set_named_env(name: &str, value: &str) -> EnvGuard {
553 // SAFETY: ENV_LOCK held; the only other accessor in-process is
554 // our own `from_env`, which the caller invokes synchronously.
555 unsafe { std::env::set_var(name, value) };
556 EnvGuard
557 }
558
559 fn clear_env() -> EnvGuard {
560 // SAFETY: same as set_env.
561 for k in [
562 ENV_CLUSTER_COSINE_THRESHOLD,
563 ENV_CLUSTER_MIN_SIZE,
564 ENV_ABSTRACTION_MAX_TOKENS,
565 ENV_CONTRADICTION_CHECK_ENABLED,
566 ] {
567 unsafe { std::env::remove_var(k) };
568 }
569 EnvGuard
570 }
571
572 #[test]
573 fn unset_env_yields_default_threshold() {
574 let _lock =
575 ENV_LOCK.lock().unwrap_or_else(|p| p.into_inner());
576 let _g = clear_env();
577 let cfg = StewardConfig::from_env().expect("ok");
578 assert_eq!(
579 cfg.cluster_cosine_threshold,
580 StewardConfig::default().cluster_cosine_threshold
581 );
582 }
583
584 #[test]
585 fn empty_or_whitespace_yields_default_threshold() {
586 let _lock =
587 ENV_LOCK.lock().unwrap_or_else(|p| p.into_inner());
588 for value in ["", " ", "\t"] {
589 let _g = set_env(value);
590 let cfg = StewardConfig::from_env().expect("ok");
591 assert_eq!(
592 cfg.cluster_cosine_threshold,
593 StewardConfig::default().cluster_cosine_threshold,
594 "unexpected override for empty value {value:?}"
595 );
596 }
597 }
598
599 #[test]
600 fn valid_value_overrides_default() {
601 let _lock =
602 ENV_LOCK.lock().unwrap_or_else(|p| p.into_inner());
603 let _g = set_env("0.92");
604 let cfg = StewardConfig::from_env().expect("0.92 is valid");
605 assert!(
606 (cfg.cluster_cosine_threshold - 0.92).abs() < 1e-6,
607 "got {}",
608 cfg.cluster_cosine_threshold
609 );
610 }
611
612 #[test]
613 fn boundary_one_is_accepted() {
614 let _lock =
615 ENV_LOCK.lock().unwrap_or_else(|p| p.into_inner());
616 let _g = set_env("1.0");
617 let cfg = StewardConfig::from_env().expect("1.0 is valid");
618 assert_eq!(cfg.cluster_cosine_threshold, 1.0);
619 }
620
621 #[test]
622 fn unparseable_value_errors() {
623 let _lock =
624 ENV_LOCK.lock().unwrap_or_else(|p| p.into_inner());
625 let _g = set_env("not-a-number");
626 let err = StewardConfig::from_env().unwrap_err();
627 assert!(matches!(err, Error::InvalidInput(_)), "got {err:?}");
628 }
629
630 #[test]
631 fn zero_is_rejected() {
632 let _lock =
633 ENV_LOCK.lock().unwrap_or_else(|p| p.into_inner());
634 let _g = set_env("0.0");
635 let err = StewardConfig::from_env().unwrap_err();
636 assert!(matches!(err, Error::InvalidInput(_)));
637 }
638
639 #[test]
640 fn negative_is_rejected() {
641 let _lock =
642 ENV_LOCK.lock().unwrap_or_else(|p| p.into_inner());
643 let _g = set_env("-0.1");
644 let err = StewardConfig::from_env().unwrap_err();
645 assert!(matches!(err, Error::InvalidInput(_)));
646 }
647
648 #[test]
649 fn above_one_is_rejected() {
650 let _lock =
651 ENV_LOCK.lock().unwrap_or_else(|p| p.into_inner());
652 let _g = set_env("1.01");
653 let err = StewardConfig::from_env().unwrap_err();
654 assert!(matches!(err, Error::InvalidInput(_)));
655 }
656
657 #[test]
658 fn nan_is_rejected() {
659 let _lock =
660 ENV_LOCK.lock().unwrap_or_else(|p| p.into_inner());
661 let _g = set_env("NaN");
662 let err = StewardConfig::from_env().unwrap_err();
663 assert!(matches!(err, Error::InvalidInput(_)));
664 }
665
666 // ---- ENV_CLUSTER_MIN_SIZE ----
667
668 #[test]
669 fn cluster_min_size_valid_overrides_default() {
670 let _lock =
671 ENV_LOCK.lock().unwrap_or_else(|p| p.into_inner());
672 let _g = set_named_env(ENV_CLUSTER_MIN_SIZE, "5");
673 let cfg = StewardConfig::from_env().expect("5 is valid");
674 assert_eq!(cfg.cluster_min_size, 5);
675 }
676
677 #[test]
678 fn cluster_min_size_zero_is_rejected() {
679 let _lock =
680 ENV_LOCK.lock().unwrap_or_else(|p| p.into_inner());
681 let _g = set_named_env(ENV_CLUSTER_MIN_SIZE, "0");
682 let err = StewardConfig::from_env().unwrap_err();
683 assert!(matches!(err, Error::InvalidInput(_)));
684 }
685
686 #[test]
687 fn cluster_min_size_unparseable_errors() {
688 let _lock =
689 ENV_LOCK.lock().unwrap_or_else(|p| p.into_inner());
690 let _g = set_named_env(ENV_CLUSTER_MIN_SIZE, "not-a-number");
691 let err = StewardConfig::from_env().unwrap_err();
692 assert!(matches!(err, Error::InvalidInput(_)));
693 }
694
695 // ---- ENV_ABSTRACTION_MAX_TOKENS ----
696
697 #[test]
698 fn abstraction_max_tokens_valid_overrides_default() {
699 let _lock =
700 ENV_LOCK.lock().unwrap_or_else(|p| p.into_inner());
701 let _g = set_named_env(ENV_ABSTRACTION_MAX_TOKENS, "1024");
702 let cfg = StewardConfig::from_env().expect("1024 is valid");
703 assert_eq!(cfg.abstraction_max_tokens, 1024);
704 }
705
706 #[test]
707 fn abstraction_max_tokens_above_upper_bound_is_rejected() {
708 let _lock =
709 ENV_LOCK.lock().unwrap_or_else(|p| p.into_inner());
710 let _g = set_named_env(ENV_ABSTRACTION_MAX_TOKENS, "131072");
711 let err = StewardConfig::from_env().unwrap_err();
712 assert!(matches!(err, Error::InvalidInput(_)));
713 }
714
715 #[test]
716 fn abstraction_max_tokens_zero_is_rejected() {
717 let _lock =
718 ENV_LOCK.lock().unwrap_or_else(|p| p.into_inner());
719 let _g = set_named_env(ENV_ABSTRACTION_MAX_TOKENS, "0");
720 let err = StewardConfig::from_env().unwrap_err();
721 assert!(matches!(err, Error::InvalidInput(_)));
722 }
723
724 // ---- ENV_CONTRADICTION_CHECK_ENABLED ----
725
726 #[test]
727 fn contradiction_check_false_overrides_default_true() {
728 let _lock =
729 ENV_LOCK.lock().unwrap_or_else(|p| p.into_inner());
730 let _g = set_named_env(ENV_CONTRADICTION_CHECK_ENABLED, "false");
731 let cfg = StewardConfig::from_env().expect("false is valid");
732 assert!(!cfg.contradiction_check_enabled);
733 }
734
735 #[test]
736 fn contradiction_check_accepts_case_insensitive() {
737 let _lock =
738 ENV_LOCK.lock().unwrap_or_else(|p| p.into_inner());
739 for value in ["True", "TRUE", "tRuE"] {
740 let _g = set_named_env(ENV_CONTRADICTION_CHECK_ENABLED, value);
741 let cfg =
742 StewardConfig::from_env().expect("case-insensitive true");
743 assert!(cfg.contradiction_check_enabled, "got false for {value:?}");
744 }
745 for value in ["False", "FALSE", "fAlSe"] {
746 let _g = set_named_env(ENV_CONTRADICTION_CHECK_ENABLED, value);
747 let cfg =
748 StewardConfig::from_env().expect("case-insensitive false");
749 assert!(!cfg.contradiction_check_enabled, "got true for {value:?}");
750 }
751 }
752
753 #[test]
754 fn contradiction_check_rejects_non_bool_strings() {
755 let _lock =
756 ENV_LOCK.lock().unwrap_or_else(|p| p.into_inner());
757 // Numeric / yes-no / on-off shorthand intentionally NOT
758 // accepted — strict bool spelling avoids ambiguity ("0" looks
759 // boolean-like in many languages but `bool::from_str` rejects it).
760 for value in ["1", "0", "yes", "no", "on", "off", "maybe"] {
761 let _g = set_named_env(ENV_CONTRADICTION_CHECK_ENABLED, value);
762 let err = StewardConfig::from_env().unwrap_err();
763 assert!(
764 matches!(err, Error::InvalidInput(_)),
765 "expected error for {value:?}"
766 );
767 }
768 }
769
770 // ---- cross-field ----
771
772 #[test]
773 fn all_four_env_vars_set_simultaneously_take_effect() {
774 let _lock =
775 ENV_LOCK.lock().unwrap_or_else(|p| p.into_inner());
776 let _g1 = set_named_env(ENV_CLUSTER_COSINE_THRESHOLD, "0.91");
777 let _g2 = set_named_env(ENV_CLUSTER_MIN_SIZE, "5");
778 let _g3 = set_named_env(ENV_ABSTRACTION_MAX_TOKENS, "1024");
779 let _g4 =
780 set_named_env(ENV_CONTRADICTION_CHECK_ENABLED, "false");
781 let cfg =
782 StewardConfig::from_env().expect("all four valid together");
783 assert!((cfg.cluster_cosine_threshold - 0.91).abs() < 1e-6);
784 assert_eq!(cfg.cluster_min_size, 5);
785 assert_eq!(cfg.abstraction_max_tokens, 1024);
786 assert!(!cfg.contradiction_check_enabled);
787 }
788
789 // ----------------------------------------------------------------
790 // v0.11.1 — from_settings_then_env layering
791 // ----------------------------------------------------------------
792
793 /// Both inputs `None` AND no env set → result equals
794 /// `StewardConfig::default()`. Pins the backward-compat path: pre-
795 /// v0.11.1 configs (no `[steward]` block) and no env vars must not
796 /// change daemon behaviour.
797 #[test]
798 fn from_settings_then_env_no_overrides_yields_default() {
799 let _lock =
800 ENV_LOCK.lock().unwrap_or_else(|p| p.into_inner());
801 let _g = clear_env();
802 let cfg = StewardConfig::from_settings_then_env(None, None)
803 .expect("no inputs, no env, must succeed");
804 let d = StewardConfig::default();
805 assert_eq!(cfg.cluster_min_size, d.cluster_min_size);
806 assert!((cfg.cluster_cosine_threshold - d.cluster_cosine_threshold).abs() < 1e-6);
807 assert_eq!(cfg.abstraction_max_tokens, d.abstraction_max_tokens);
808 assert_eq!(cfg.contradiction_check_enabled, d.contradiction_check_enabled);
809 }
810
811 /// TOML-supplied values flow through when env vars are unset.
812 /// This is the v0.11.1 happy path: operator writes
813 /// `[steward] cluster_min_size = 4` + `cluster_cosine_threshold = 0.7`
814 /// and that's exactly what the daemon uses for clustering.
815 #[test]
816 fn from_settings_then_env_toml_values_take_effect_without_env() {
817 let _lock =
818 ENV_LOCK.lock().unwrap_or_else(|p| p.into_inner());
819 let _g = clear_env();
820 let cfg = StewardConfig::from_settings_then_env(Some(4), Some(0.7))
821 .expect("valid TOML inputs");
822 assert_eq!(cfg.cluster_min_size, 4);
823 assert!((cfg.cluster_cosine_threshold - 0.7).abs() < 1e-6);
824 }
825
826 /// Env vars beat TOML overrides — the operator's runtime escape
827 /// hatch (set `SOLO_CLUSTER_*` in the shell to override the config
828 /// file without editing it) must keep working in v0.11.1.
829 #[test]
830 fn from_settings_then_env_env_wins_over_toml() {
831 let _lock =
832 ENV_LOCK.lock().unwrap_or_else(|p| p.into_inner());
833 let _g1 = set_named_env(ENV_CLUSTER_MIN_SIZE, "8");
834 let _g2 = set_named_env(ENV_CLUSTER_COSINE_THRESHOLD, "0.95");
835 // TOML asks for (3, 0.5); env asks for (8, 0.95). Env must win.
836 let cfg = StewardConfig::from_settings_then_env(Some(3), Some(0.5))
837 .expect("env wins over TOML, both valid");
838 assert_eq!(cfg.cluster_min_size, 8, "env override beats TOML");
839 assert!(
840 (cfg.cluster_cosine_threshold - 0.95).abs() < 1e-6,
841 "env override beats TOML"
842 );
843 }
844
845 /// Partial layering: env sets ONLY `SOLO_CLUSTER_MIN_SIZE`; TOML
846 /// supplies the cosine threshold. Resolution should pick env for
847 /// min_size + TOML for the threshold (per-field precedence, not
848 /// all-or-nothing).
849 #[test]
850 fn from_settings_then_env_partial_env_keeps_toml_for_untouched_field() {
851 let _lock =
852 ENV_LOCK.lock().unwrap_or_else(|p| p.into_inner());
853 let _g = set_named_env(ENV_CLUSTER_MIN_SIZE, "9");
854 let cfg = StewardConfig::from_settings_then_env(None, Some(0.42))
855 .expect("env on min_size, TOML on threshold");
856 assert_eq!(cfg.cluster_min_size, 9, "env override applied");
857 assert!(
858 (cfg.cluster_cosine_threshold - 0.42).abs() < 1e-6,
859 "env unset for threshold → TOML 0.42 survives"
860 );
861 }
862
863 /// Invalid TOML values surface a typed error rather than silently
864 /// falling back. Mirrors the env-var parser's strictness so
865 /// operators see the same diagnostic regardless of which surface
866 /// they used.
867 #[test]
868 fn from_settings_then_env_rejects_bad_toml_threshold() {
869 let _lock =
870 ENV_LOCK.lock().unwrap_or_else(|p| p.into_inner());
871 let _g = clear_env();
872 // > 1.0 is out of range.
873 let err =
874 StewardConfig::from_settings_then_env(None, Some(1.5)).unwrap_err();
875 assert!(matches!(err, Error::InvalidInput(_)));
876 // Zero is out of range (must be > 0.0).
877 let err =
878 StewardConfig::from_settings_then_env(None, Some(0.0)).unwrap_err();
879 assert!(matches!(err, Error::InvalidInput(_)));
880 // NaN.
881 let err =
882 StewardConfig::from_settings_then_env(None, Some(f32::NAN)).unwrap_err();
883 assert!(matches!(err, Error::InvalidInput(_)));
884 // cluster_min_size = 0 is rejected.
885 let err =
886 StewardConfig::from_settings_then_env(Some(0), None).unwrap_err();
887 assert!(matches!(err, Error::InvalidInput(_)));
888 }
889}
890
891#[cfg(test)]
892mod has_llm_tests {
893 use super::*;
894 use crate::test_support::StubLlmClient;
895
896 /// A `Steward` wrapping a default stub reports `has_llm() ==
897 /// false` — the writer's contradiction sweep gates on this and
898 /// must skip when only a stub is configured.
899 #[test]
900 fn has_llm_false_for_default_stub() {
901 let stub = Arc::new(StubLlmClient::default_stub());
902 let s = Steward::new(stub, StewardConfig::default());
903 assert!(!s.has_llm());
904 }
905
906 /// A stub whose `pretend_real_llm(true)` flag is set reports
907 /// `has_llm() == true`. Used by storage-layer integration tests
908 /// that need the contradiction-sweep code path to run against
909 /// canned responses.
910 #[test]
911 fn has_llm_true_when_stub_pretends() {
912 let stub = Arc::new(
913 StubLlmClient::default_stub().pretend_real_llm(true),
914 );
915 let s = Steward::new(stub, StewardConfig::default());
916 assert!(s.has_llm());
917 }
918}
919
920// ----------------------------------------------------------------
921// v0.10.1 (P4 audit m5): extract_triples_batch per-cluster timeout
922// ----------------------------------------------------------------
923#[cfg(test)]
924mod extract_triples_batch_timeout_tests {
925 use super::*;
926 use crate::test_support::StubLlmClient;
927 use async_trait::async_trait;
928 use solo_core::{
929 Confidence, Embedding, EmbeddingDtype, Episode, MemoryId, Message, Role, Tier,
930 };
931 use std::sync::Arc;
932 use std::sync::atomic::{AtomicUsize, Ordering};
933 use std::time::Duration;
934
935 /// A custom LlmClient stub that holds its `complete()` future for a
936 /// configurable per-call duration before returning a parseable
937 /// abstraction response. Used to force `tokio::time::timeout` to
938 /// fire inside `extract_triples_batch`.
939 ///
940 /// Call N (0-indexed): sleeps `delays[N.min(delays.len()-1)]` ms.
941 /// The last entry is reused for every subsequent call (so a single
942 /// `vec![5_000]` makes EVERY call slow).
943 struct SlowStub {
944 delays_ms: Vec<u64>,
945 calls: AtomicUsize,
946 }
947
948 impl SlowStub {
949 fn new(delays_ms: Vec<u64>) -> Self {
950 Self {
951 delays_ms,
952 calls: AtomicUsize::new(0),
953 }
954 }
955 fn call_count(&self) -> usize {
956 self.calls.load(Ordering::Relaxed)
957 }
958 }
959
960 #[async_trait]
961 impl LlmClient for SlowStub {
962 fn name(&self) -> &str {
963 "slow-stub"
964 }
965 async fn complete(&self, _messages: &[Message]) -> Result<Message> {
966 let idx = self.calls.fetch_add(1, Ordering::Relaxed);
967 let delay = self
968 .delays_ms
969 .get(idx)
970 .or_else(|| self.delays_ms.last())
971 .copied()
972 .unwrap_or(0);
973 tokio::time::sleep(Duration::from_millis(delay)).await;
974 Ok(Message {
975 role: Role::Assistant,
976 content: r#"{"content":"ok","confidence":0.7,"triples":[]}"#
977 .into(),
978 })
979 }
980 fn is_real_llm(&self) -> bool {
981 true
982 }
983 }
984
985 /// Build a minimal cluster + one episode so `abstract_cluster`
986 /// has shapeful input. The contents don't matter — the SlowStub
987 /// ignores the prompt.
988 fn mk_cluster_with_episode() -> (Cluster, Vec<Episode>) {
989 let cluster_id = MemoryId::new();
990 let memory_id = MemoryId::new();
991 let episode = Episode {
992 memory_id,
993 ts_ms: 0,
994 source_type: "user_message".into(),
995 content: "hello".into(),
996 encoding_context: Default::default(),
997 provenance: None,
998 confidence: Confidence::new(0.9).unwrap(),
999 strength: 0.5,
1000 salience: 0.5,
1001 tier: Tier::Hot,
1002 source_id: None,
1003 };
1004 let centroid = Embedding {
1005 data: bytemuck::cast_slice(&[1.0f32, 0.0, 0.0, 0.0]).to_vec(),
1006 dim: 4,
1007 dtype: EmbeddingDtype::F32,
1008 };
1009 let cluster = Cluster {
1010 cluster_id,
1011 episode_ids: vec![memory_id],
1012 centroid: Some(centroid),
1013 coherence: 0.95,
1014 };
1015 (cluster, vec![episode])
1016 }
1017
1018 fn rt() -> tokio::runtime::Runtime {
1019 tokio::runtime::Builder::new_multi_thread()
1020 .worker_threads(2)
1021 .enable_all()
1022 .build()
1023 .unwrap()
1024 }
1025
1026 /// m5 test 1 (the main pin): when one cluster's
1027 /// `abstract_cluster` hangs past the per-cluster timeout, the
1028 /// batch CONTINUES with the next cluster. The hung cluster is
1029 /// counted as deferred — not lumped in with errors.
1030 #[test]
1031 fn extract_triples_batch_continues_after_cluster_timeout() {
1032 let rt = rt();
1033 rt.block_on(async {
1034 // Cluster 1: fast (50ms). Cluster 2: hangs (3s — well past
1035 // the 200ms timeout). Cluster 3: fast (50ms).
1036 let stub = Arc::new(SlowStub::new(vec![50, 3_000, 50]));
1037 let steward = Steward::new(stub.clone(), StewardConfig::default());
1038
1039 let inputs = vec![
1040 mk_cluster_with_episode(),
1041 mk_cluster_with_episode(),
1042 mk_cluster_with_episode(),
1043 ];
1044
1045 let outcome = steward
1046 .extract_triples_batch(inputs, Duration::from_millis(200))
1047 .await;
1048
1049 assert_eq!(
1050 outcome.abstractions.len(),
1051 2,
1052 "clusters 1 + 3 must succeed while cluster 2 times out"
1053 );
1054 assert_eq!(
1055 outcome.deferred_count, 1,
1056 "cluster 2 must be counted as deferred (timeout), not failed"
1057 );
1058 // Verify all three calls were attempted (the loop didn't
1059 // bail out after the timeout).
1060 assert_eq!(
1061 stub.call_count(),
1062 3,
1063 "every cluster's abstract_cluster must be attempted; \
1064 a timeout on one MUST NOT abort the batch"
1065 );
1066 });
1067 }
1068
1069 /// m5 test 2: happy path — all clusters complete in time → zero
1070 /// deferred. Pins that the timeout path doesn't introduce
1071 /// spurious deferrals.
1072 #[test]
1073 fn extract_triples_batch_returns_succeeded_count() {
1074 let rt = rt();
1075 rt.block_on(async {
1076 let stub = Arc::new(SlowStub::new(vec![10]));
1077 let steward = Steward::new(stub, StewardConfig::default());
1078
1079 let inputs = vec![
1080 mk_cluster_with_episode(),
1081 mk_cluster_with_episode(),
1082 mk_cluster_with_episode(),
1083 ];
1084
1085 let outcome = steward
1086 .extract_triples_batch(inputs, Duration::from_secs(5))
1087 .await;
1088
1089 assert_eq!(outcome.abstractions.len(), 3);
1090 assert_eq!(
1091 outcome.deferred_count, 0,
1092 "no clusters exceeded the timeout"
1093 );
1094 });
1095 }
1096
1097 /// m5 test 3: clusters that return Err (NOT timeouts) are
1098 /// distinct from deferred. The deferred counter is for timeout
1099 /// hangs specifically; a per-cluster Err is not counted toward
1100 /// `deferred_count` (it's the implicit `cluster_count -
1101 /// abstractions_built - deferred_count` half).
1102 ///
1103 /// We use a real `StubLlmClient` with a canned response that
1104 /// fails to parse — `abstract_cluster` returns an Err. The
1105 /// `SlowStub` doesn't have a "fail" mode without complicating it.
1106 #[test]
1107 fn extract_triples_batch_failure_distinct_from_timeout() {
1108 let rt = rt();
1109 rt.block_on(async {
1110 // Stub returns a non-JSON response → abstraction parse
1111 // fails → abstract_cluster returns Err. Note we still need
1112 // `pretend_real_llm(true)` so `has_llm()` returns true.
1113 let stub = Arc::new(
1114 StubLlmClient::with_canned("garbage-stub", "NOT-JSON")
1115 .pretend_real_llm(true),
1116 );
1117 let steward = Steward::new(stub, StewardConfig::default());
1118
1119 let inputs = vec![mk_cluster_with_episode()];
1120 let outcome = steward
1121 .extract_triples_batch(inputs, Duration::from_secs(5))
1122 .await;
1123
1124 assert_eq!(
1125 outcome.abstractions.len(),
1126 0,
1127 "the parse failure means no abstraction lands"
1128 );
1129 assert_eq!(
1130 outcome.deferred_count, 0,
1131 "an Err return is NOT a deferral — only timeout-elapsed \
1132 counts as deferred. failed clusters are computed by the \
1133 caller as cluster_count - abstractions_built - deferred"
1134 );
1135 });
1136 }
1137
1138 /// m5 test 4: zero-Duration disables the timeout. Verifies the
1139 /// operator escape hatch (`[triples] cluster_timeout_secs = 0`
1140 /// → `Duration::ZERO`). The slow cluster STILL completes
1141 /// because we never wrapped it.
1142 #[test]
1143 fn extract_triples_batch_zero_timeout_disables_per_cluster_timeout() {
1144 let rt = rt();
1145 rt.block_on(async {
1146 // 100ms delay; tiny but enough to ensure tokio::timeout
1147 // (if active) wouldn't fire on a tight bound.
1148 let stub = Arc::new(SlowStub::new(vec![100]));
1149 let steward = Steward::new(stub, StewardConfig::default());
1150 let inputs = vec![mk_cluster_with_episode()];
1151 let outcome = steward
1152 .extract_triples_batch(inputs, Duration::ZERO)
1153 .await;
1154 assert_eq!(
1155 outcome.abstractions.len(),
1156 1,
1157 "zero Duration disables the timeout; the cluster must \
1158 succeed even with a delay"
1159 );
1160 assert_eq!(outcome.deferred_count, 0);
1161 });
1162 }
1163
1164 /// m5 test 5: when the steward has no LLM (`has_llm() == false`),
1165 /// the batch short-circuits with an empty outcome regardless of
1166 /// the per-cluster timeout. Pins the fast path.
1167 #[test]
1168 fn extract_triples_batch_no_llm_returns_empty_outcome() {
1169 let rt = rt();
1170 rt.block_on(async {
1171 // default stub: is_real_llm == false → has_llm == false.
1172 let stub = Arc::new(StubLlmClient::default_stub());
1173 let steward = Steward::new(stub, StewardConfig::default());
1174 let inputs = vec![mk_cluster_with_episode()];
1175 let outcome = steward
1176 .extract_triples_batch(inputs, Duration::from_secs(5))
1177 .await;
1178 assert!(outcome.abstractions.is_empty());
1179 assert_eq!(outcome.deferred_count, 0);
1180 });
1181 }
1182
1183 /// m5 test 6: the passed Duration is actually wired through.
1184 /// Distinguished from test 1 (which already pins the deferral)
1185 /// by using a generous timeout that should comfortably swallow
1186 /// the call — if the wiring is broken (e.g. we accidentally
1187 /// hardcoded a different Duration), the slow call would deferr
1188 /// inappropriately.
1189 ///
1190 /// Pin both directions: 50ms call with 1s timeout succeeds; the
1191 /// SAME 50ms call with a 10ms timeout defers.
1192 #[test]
1193 fn extract_triples_batch_uses_passed_timeout() {
1194 let rt = rt();
1195 rt.block_on(async {
1196 // 50ms call, generous 1s timeout → success.
1197 let stub = Arc::new(SlowStub::new(vec![50]));
1198 let steward = Steward::new(stub, StewardConfig::default());
1199 let outcome = steward
1200 .extract_triples_batch(
1201 vec![mk_cluster_with_episode()],
1202 Duration::from_secs(1),
1203 )
1204 .await;
1205 assert_eq!(outcome.abstractions.len(), 1);
1206 assert_eq!(outcome.deferred_count, 0);
1207
1208 // SAME 50ms call shape, tight 10ms timeout → deferral.
1209 let stub2 = Arc::new(SlowStub::new(vec![50]));
1210 let steward2 = Steward::new(stub2, StewardConfig::default());
1211 let outcome2 = steward2
1212 .extract_triples_batch(
1213 vec![mk_cluster_with_episode()],
1214 Duration::from_millis(10),
1215 )
1216 .await;
1217 assert_eq!(outcome2.abstractions.len(), 0);
1218 assert_eq!(outcome2.deferred_count, 1);
1219 });
1220 }
1221}