use std::sync::Arc;
use std::time::Duration;
use solo_core::{
Cluster, Contradiction, Embedding, Episode, Error, LlmClient, Result, SemanticAbstraction,
Triple,
};
pub const ENV_CLUSTER_COSINE_THRESHOLD: &str =
"SOLO_CLUSTER_COSINE_THRESHOLD";
pub const ENV_CLUSTER_MIN_SIZE: &str = "SOLO_CLUSTER_MIN_SIZE";
pub const ENV_ABSTRACTION_MAX_TOKENS: &str =
"SOLO_ABSTRACTION_MAX_TOKENS";
pub const ENV_CONTRADICTION_CHECK_ENABLED: &str =
"SOLO_CONTRADICTION_CHECK_ENABLED";
pub mod abstraction;
pub mod cluster;
pub mod contradiction;
#[cfg(any(test, feature = "test-support"))]
pub mod test_support;
pub struct Steward {
client: Arc<dyn LlmClient>,
config: StewardConfig,
}
#[derive(Debug, Clone)]
pub struct StewardConfig {
pub cluster_min_size: usize,
pub cluster_cosine_threshold: f32,
pub abstraction_max_tokens: usize,
pub contradiction_check_enabled: bool,
}
impl Default for StewardConfig {
fn default() -> Self {
Self {
cluster_min_size: 2,
cluster_cosine_threshold: 0.55,
abstraction_max_tokens: 512,
contradiction_check_enabled: true,
}
}
}
#[derive(Debug, Default, Clone)]
pub struct ExtractTriplesBatchOutcome {
pub abstractions: Vec<(solo_core::MemoryId, SemanticAbstraction)>,
pub deferred_count: usize,
}
fn env_trimmed(name: &str) -> Option<String> {
std::env::var(name).ok().and_then(|s| {
let t = s.trim();
if t.is_empty() {
None
} else {
Some(t.to_string())
}
})
}
impl StewardConfig {
pub fn from_env() -> Result<Self> {
let mut cfg = Self::default();
if let Some(raw) = env_trimmed(ENV_CLUSTER_COSINE_THRESHOLD) {
let parsed: f32 = raw.parse().map_err(|_| {
Error::invalid_input(format!(
"{ENV_CLUSTER_COSINE_THRESHOLD}: not a valid f32 ({raw:?})"
))
})?;
if !parsed.is_finite() || parsed <= 0.0 || parsed > 1.0 {
return Err(Error::invalid_input(format!(
"{ENV_CLUSTER_COSINE_THRESHOLD}: must be a finite f32 in (0.0, 1.0], got {parsed}"
)));
}
cfg.cluster_cosine_threshold = parsed;
}
if let Some(raw) = env_trimmed(ENV_CLUSTER_MIN_SIZE) {
let parsed: usize = raw.parse().map_err(|_| {
Error::invalid_input(format!(
"{ENV_CLUSTER_MIN_SIZE}: not a valid non-negative integer ({raw:?})"
))
})?;
if parsed < 1 {
return Err(Error::invalid_input(format!(
"{ENV_CLUSTER_MIN_SIZE}: must be >= 1, got {parsed}"
)));
}
cfg.cluster_min_size = parsed;
}
if let Some(raw) = env_trimmed(ENV_ABSTRACTION_MAX_TOKENS) {
let parsed: usize = raw.parse().map_err(|_| {
Error::invalid_input(format!(
"{ENV_ABSTRACTION_MAX_TOKENS}: not a valid non-negative integer ({raw:?})"
))
})?;
if !(1..=65_536).contains(&parsed) {
return Err(Error::invalid_input(format!(
"{ENV_ABSTRACTION_MAX_TOKENS}: must be in [1, 65536], got {parsed}"
)));
}
cfg.abstraction_max_tokens = parsed;
}
if let Some(raw) = env_trimmed(ENV_CONTRADICTION_CHECK_ENABLED) {
let parsed = match raw.to_ascii_lowercase().as_str() {
"true" => true,
"false" => false,
_ => {
return Err(Error::invalid_input(format!(
"{ENV_CONTRADICTION_CHECK_ENABLED}: must be \"true\" or \"false\" (case-insensitive), got {raw:?}"
)));
}
};
cfg.contradiction_check_enabled = parsed;
}
Ok(cfg)
}
pub fn from_settings_then_env(
toml_min_size: Option<usize>,
toml_cosine_threshold: Option<f32>,
) -> Result<Self> {
let mut cfg = Self::default();
if let Some(parsed) = toml_min_size {
if parsed < 1 {
return Err(Error::invalid_input(format!(
"[steward] cluster_min_size: must be >= 1, got {parsed}"
)));
}
cfg.cluster_min_size = parsed;
}
if let Some(parsed) = toml_cosine_threshold {
if !parsed.is_finite() || parsed <= 0.0 || parsed > 1.0 {
return Err(Error::invalid_input(format!(
"[steward] cluster_cosine_threshold: must be a finite f32 in (0.0, 1.0], got {parsed}"
)));
}
cfg.cluster_cosine_threshold = parsed;
}
let env_cfg = Self::from_env()?;
let env_default = Self::default();
if (env_cfg.cluster_min_size, env_cfg.cluster_cosine_threshold)
!= (env_default.cluster_min_size, env_default.cluster_cosine_threshold)
{
if env_cfg.cluster_min_size != env_default.cluster_min_size {
cfg.cluster_min_size = env_cfg.cluster_min_size;
}
if env_cfg.cluster_cosine_threshold != env_default.cluster_cosine_threshold {
cfg.cluster_cosine_threshold = env_cfg.cluster_cosine_threshold;
}
}
cfg.abstraction_max_tokens = env_cfg.abstraction_max_tokens;
cfg.contradiction_check_enabled = env_cfg.contradiction_check_enabled;
Ok(cfg)
}
}
impl Steward {
pub fn new(client: Arc<dyn LlmClient>, config: StewardConfig) -> Self {
Self { client, config }
}
pub fn config(&self) -> &StewardConfig {
&self.config
}
pub fn has_llm(&self) -> bool {
self.client.is_real_llm()
}
pub async fn cluster_episodes(
&self,
inputs: &[(Episode, Embedding)],
) -> Result<Vec<Cluster>> {
cluster::cluster_episodes(inputs, &self.config)
}
pub async fn abstract_cluster(
&self,
cluster: &Cluster,
episodes: &[Episode],
) -> Result<SemanticAbstraction> {
abstraction::abstract_cluster(cluster, episodes, self.client.as_ref()).await
}
pub async fn extract_triples_batch(
&self,
clusters_with_episodes: Vec<(Cluster, Vec<Episode>)>,
per_cluster_timeout: Duration,
) -> ExtractTriplesBatchOutcome {
if !self.has_llm() {
return ExtractTriplesBatchOutcome::default();
}
let mut abstractions: Vec<(solo_core::MemoryId, SemanticAbstraction)> =
Vec::with_capacity(clusters_with_episodes.len());
let mut deferred_count: usize = 0;
let timeout_enabled = !per_cluster_timeout.is_zero();
for (cluster, episodes) in clusters_with_episodes {
let cluster_id = cluster.cluster_id;
let call = self.abstract_cluster(&cluster, &episodes);
let result = if timeout_enabled {
match tokio::time::timeout(per_cluster_timeout, call).await {
Ok(inner) => inner,
Err(_elapsed) => {
tracing::warn!(
cluster_id = %cluster_id,
timeout_secs = per_cluster_timeout.as_secs(),
"v0.10.1 m5 extract_triples_batch: abstract_cluster \
timed out; deferring cluster to next batch tick \
(the cluster's lack of a semantic_abstractions \
row will re-select it on the next pass)"
);
deferred_count += 1;
continue;
}
}
} else {
call.await
};
match result {
Ok(abs) => {
abstractions.push((cluster_id, abs));
}
Err(e) => {
tracing::warn!(
cluster_id = %cluster_id,
error = %e,
"v0.9.0 P4c extract_triples_batch: abstract_cluster \
failed; cluster persists, abstraction retries on \
next tick"
);
}
}
}
ExtractTriplesBatchOutcome {
abstractions,
deferred_count,
}
}
pub async fn detect_contradiction(
&self,
a: &Triple,
b: &Triple,
) -> Result<Option<Contradiction>> {
contradiction::detect_contradiction(a, b, self.client.as_ref()).await
}
}
#[cfg(test)]
mod from_env_tests {
use super::*;
use std::sync::Mutex;
static ENV_LOCK: Mutex<()> = Mutex::new(());
struct EnvGuard;
impl Drop for EnvGuard {
fn drop(&mut self) {
for k in [
ENV_CLUSTER_COSINE_THRESHOLD,
ENV_CLUSTER_MIN_SIZE,
ENV_ABSTRACTION_MAX_TOKENS,
ENV_CONTRADICTION_CHECK_ENABLED,
] {
unsafe { std::env::remove_var(k) };
}
}
}
fn set_env(value: &str) -> EnvGuard {
set_named_env(ENV_CLUSTER_COSINE_THRESHOLD, value)
}
fn set_named_env(name: &str, value: &str) -> EnvGuard {
unsafe { std::env::set_var(name, value) };
EnvGuard
}
fn clear_env() -> EnvGuard {
for k in [
ENV_CLUSTER_COSINE_THRESHOLD,
ENV_CLUSTER_MIN_SIZE,
ENV_ABSTRACTION_MAX_TOKENS,
ENV_CONTRADICTION_CHECK_ENABLED,
] {
unsafe { std::env::remove_var(k) };
}
EnvGuard
}
#[test]
fn unset_env_yields_default_threshold() {
let _lock =
ENV_LOCK.lock().unwrap_or_else(|p| p.into_inner());
let _g = clear_env();
let cfg = StewardConfig::from_env().expect("ok");
assert_eq!(
cfg.cluster_cosine_threshold,
StewardConfig::default().cluster_cosine_threshold
);
}
#[test]
fn empty_or_whitespace_yields_default_threshold() {
let _lock =
ENV_LOCK.lock().unwrap_or_else(|p| p.into_inner());
for value in ["", " ", "\t"] {
let _g = set_env(value);
let cfg = StewardConfig::from_env().expect("ok");
assert_eq!(
cfg.cluster_cosine_threshold,
StewardConfig::default().cluster_cosine_threshold,
"unexpected override for empty value {value:?}"
);
}
}
#[test]
fn valid_value_overrides_default() {
let _lock =
ENV_LOCK.lock().unwrap_or_else(|p| p.into_inner());
let _g = set_env("0.92");
let cfg = StewardConfig::from_env().expect("0.92 is valid");
assert!(
(cfg.cluster_cosine_threshold - 0.92).abs() < 1e-6,
"got {}",
cfg.cluster_cosine_threshold
);
}
#[test]
fn boundary_one_is_accepted() {
let _lock =
ENV_LOCK.lock().unwrap_or_else(|p| p.into_inner());
let _g = set_env("1.0");
let cfg = StewardConfig::from_env().expect("1.0 is valid");
assert_eq!(cfg.cluster_cosine_threshold, 1.0);
}
#[test]
fn unparseable_value_errors() {
let _lock =
ENV_LOCK.lock().unwrap_or_else(|p| p.into_inner());
let _g = set_env("not-a-number");
let err = StewardConfig::from_env().unwrap_err();
assert!(matches!(err, Error::InvalidInput(_)), "got {err:?}");
}
#[test]
fn zero_is_rejected() {
let _lock =
ENV_LOCK.lock().unwrap_or_else(|p| p.into_inner());
let _g = set_env("0.0");
let err = StewardConfig::from_env().unwrap_err();
assert!(matches!(err, Error::InvalidInput(_)));
}
#[test]
fn negative_is_rejected() {
let _lock =
ENV_LOCK.lock().unwrap_or_else(|p| p.into_inner());
let _g = set_env("-0.1");
let err = StewardConfig::from_env().unwrap_err();
assert!(matches!(err, Error::InvalidInput(_)));
}
#[test]
fn above_one_is_rejected() {
let _lock =
ENV_LOCK.lock().unwrap_or_else(|p| p.into_inner());
let _g = set_env("1.01");
let err = StewardConfig::from_env().unwrap_err();
assert!(matches!(err, Error::InvalidInput(_)));
}
#[test]
fn nan_is_rejected() {
let _lock =
ENV_LOCK.lock().unwrap_or_else(|p| p.into_inner());
let _g = set_env("NaN");
let err = StewardConfig::from_env().unwrap_err();
assert!(matches!(err, Error::InvalidInput(_)));
}
#[test]
fn cluster_min_size_valid_overrides_default() {
let _lock =
ENV_LOCK.lock().unwrap_or_else(|p| p.into_inner());
let _g = set_named_env(ENV_CLUSTER_MIN_SIZE, "5");
let cfg = StewardConfig::from_env().expect("5 is valid");
assert_eq!(cfg.cluster_min_size, 5);
}
#[test]
fn cluster_min_size_zero_is_rejected() {
let _lock =
ENV_LOCK.lock().unwrap_or_else(|p| p.into_inner());
let _g = set_named_env(ENV_CLUSTER_MIN_SIZE, "0");
let err = StewardConfig::from_env().unwrap_err();
assert!(matches!(err, Error::InvalidInput(_)));
}
#[test]
fn cluster_min_size_unparseable_errors() {
let _lock =
ENV_LOCK.lock().unwrap_or_else(|p| p.into_inner());
let _g = set_named_env(ENV_CLUSTER_MIN_SIZE, "not-a-number");
let err = StewardConfig::from_env().unwrap_err();
assert!(matches!(err, Error::InvalidInput(_)));
}
#[test]
fn abstraction_max_tokens_valid_overrides_default() {
let _lock =
ENV_LOCK.lock().unwrap_or_else(|p| p.into_inner());
let _g = set_named_env(ENV_ABSTRACTION_MAX_TOKENS, "1024");
let cfg = StewardConfig::from_env().expect("1024 is valid");
assert_eq!(cfg.abstraction_max_tokens, 1024);
}
#[test]
fn abstraction_max_tokens_above_upper_bound_is_rejected() {
let _lock =
ENV_LOCK.lock().unwrap_or_else(|p| p.into_inner());
let _g = set_named_env(ENV_ABSTRACTION_MAX_TOKENS, "131072");
let err = StewardConfig::from_env().unwrap_err();
assert!(matches!(err, Error::InvalidInput(_)));
}
#[test]
fn abstraction_max_tokens_zero_is_rejected() {
let _lock =
ENV_LOCK.lock().unwrap_or_else(|p| p.into_inner());
let _g = set_named_env(ENV_ABSTRACTION_MAX_TOKENS, "0");
let err = StewardConfig::from_env().unwrap_err();
assert!(matches!(err, Error::InvalidInput(_)));
}
#[test]
fn contradiction_check_false_overrides_default_true() {
let _lock =
ENV_LOCK.lock().unwrap_or_else(|p| p.into_inner());
let _g = set_named_env(ENV_CONTRADICTION_CHECK_ENABLED, "false");
let cfg = StewardConfig::from_env().expect("false is valid");
assert!(!cfg.contradiction_check_enabled);
}
#[test]
fn contradiction_check_accepts_case_insensitive() {
let _lock =
ENV_LOCK.lock().unwrap_or_else(|p| p.into_inner());
for value in ["True", "TRUE", "tRuE"] {
let _g = set_named_env(ENV_CONTRADICTION_CHECK_ENABLED, value);
let cfg =
StewardConfig::from_env().expect("case-insensitive true");
assert!(cfg.contradiction_check_enabled, "got false for {value:?}");
}
for value in ["False", "FALSE", "fAlSe"] {
let _g = set_named_env(ENV_CONTRADICTION_CHECK_ENABLED, value);
let cfg =
StewardConfig::from_env().expect("case-insensitive false");
assert!(!cfg.contradiction_check_enabled, "got true for {value:?}");
}
}
#[test]
fn contradiction_check_rejects_non_bool_strings() {
let _lock =
ENV_LOCK.lock().unwrap_or_else(|p| p.into_inner());
for value in ["1", "0", "yes", "no", "on", "off", "maybe"] {
let _g = set_named_env(ENV_CONTRADICTION_CHECK_ENABLED, value);
let err = StewardConfig::from_env().unwrap_err();
assert!(
matches!(err, Error::InvalidInput(_)),
"expected error for {value:?}"
);
}
}
#[test]
fn all_four_env_vars_set_simultaneously_take_effect() {
let _lock =
ENV_LOCK.lock().unwrap_or_else(|p| p.into_inner());
let _g1 = set_named_env(ENV_CLUSTER_COSINE_THRESHOLD, "0.91");
let _g2 = set_named_env(ENV_CLUSTER_MIN_SIZE, "5");
let _g3 = set_named_env(ENV_ABSTRACTION_MAX_TOKENS, "1024");
let _g4 =
set_named_env(ENV_CONTRADICTION_CHECK_ENABLED, "false");
let cfg =
StewardConfig::from_env().expect("all four valid together");
assert!((cfg.cluster_cosine_threshold - 0.91).abs() < 1e-6);
assert_eq!(cfg.cluster_min_size, 5);
assert_eq!(cfg.abstraction_max_tokens, 1024);
assert!(!cfg.contradiction_check_enabled);
}
#[test]
fn from_settings_then_env_no_overrides_yields_default() {
let _lock =
ENV_LOCK.lock().unwrap_or_else(|p| p.into_inner());
let _g = clear_env();
let cfg = StewardConfig::from_settings_then_env(None, None)
.expect("no inputs, no env, must succeed");
let d = StewardConfig::default();
assert_eq!(cfg.cluster_min_size, d.cluster_min_size);
assert!((cfg.cluster_cosine_threshold - d.cluster_cosine_threshold).abs() < 1e-6);
assert_eq!(cfg.abstraction_max_tokens, d.abstraction_max_tokens);
assert_eq!(cfg.contradiction_check_enabled, d.contradiction_check_enabled);
}
#[test]
fn from_settings_then_env_toml_values_take_effect_without_env() {
let _lock =
ENV_LOCK.lock().unwrap_or_else(|p| p.into_inner());
let _g = clear_env();
let cfg = StewardConfig::from_settings_then_env(Some(4), Some(0.7))
.expect("valid TOML inputs");
assert_eq!(cfg.cluster_min_size, 4);
assert!((cfg.cluster_cosine_threshold - 0.7).abs() < 1e-6);
}
#[test]
fn from_settings_then_env_env_wins_over_toml() {
let _lock =
ENV_LOCK.lock().unwrap_or_else(|p| p.into_inner());
let _g1 = set_named_env(ENV_CLUSTER_MIN_SIZE, "8");
let _g2 = set_named_env(ENV_CLUSTER_COSINE_THRESHOLD, "0.95");
let cfg = StewardConfig::from_settings_then_env(Some(3), Some(0.5))
.expect("env wins over TOML, both valid");
assert_eq!(cfg.cluster_min_size, 8, "env override beats TOML");
assert!(
(cfg.cluster_cosine_threshold - 0.95).abs() < 1e-6,
"env override beats TOML"
);
}
#[test]
fn from_settings_then_env_partial_env_keeps_toml_for_untouched_field() {
let _lock =
ENV_LOCK.lock().unwrap_or_else(|p| p.into_inner());
let _g = set_named_env(ENV_CLUSTER_MIN_SIZE, "9");
let cfg = StewardConfig::from_settings_then_env(None, Some(0.42))
.expect("env on min_size, TOML on threshold");
assert_eq!(cfg.cluster_min_size, 9, "env override applied");
assert!(
(cfg.cluster_cosine_threshold - 0.42).abs() < 1e-6,
"env unset for threshold → TOML 0.42 survives"
);
}
#[test]
fn from_settings_then_env_rejects_bad_toml_threshold() {
let _lock =
ENV_LOCK.lock().unwrap_or_else(|p| p.into_inner());
let _g = clear_env();
let err =
StewardConfig::from_settings_then_env(None, Some(1.5)).unwrap_err();
assert!(matches!(err, Error::InvalidInput(_)));
let err =
StewardConfig::from_settings_then_env(None, Some(0.0)).unwrap_err();
assert!(matches!(err, Error::InvalidInput(_)));
let err =
StewardConfig::from_settings_then_env(None, Some(f32::NAN)).unwrap_err();
assert!(matches!(err, Error::InvalidInput(_)));
let err =
StewardConfig::from_settings_then_env(Some(0), None).unwrap_err();
assert!(matches!(err, Error::InvalidInput(_)));
}
}
#[cfg(test)]
mod has_llm_tests {
use super::*;
use crate::test_support::StubLlmClient;
#[test]
fn has_llm_false_for_default_stub() {
let stub = Arc::new(StubLlmClient::default_stub());
let s = Steward::new(stub, StewardConfig::default());
assert!(!s.has_llm());
}
#[test]
fn has_llm_true_when_stub_pretends() {
let stub = Arc::new(
StubLlmClient::default_stub().pretend_real_llm(true),
);
let s = Steward::new(stub, StewardConfig::default());
assert!(s.has_llm());
}
}
#[cfg(test)]
mod extract_triples_batch_timeout_tests {
use super::*;
use crate::test_support::StubLlmClient;
use async_trait::async_trait;
use solo_core::{
Confidence, Embedding, EmbeddingDtype, Episode, MemoryId, Message, Role, Tier,
};
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::Duration;
struct SlowStub {
delays_ms: Vec<u64>,
calls: AtomicUsize,
}
impl SlowStub {
fn new(delays_ms: Vec<u64>) -> Self {
Self {
delays_ms,
calls: AtomicUsize::new(0),
}
}
fn call_count(&self) -> usize {
self.calls.load(Ordering::Relaxed)
}
}
#[async_trait]
impl LlmClient for SlowStub {
fn name(&self) -> &str {
"slow-stub"
}
async fn complete(&self, _messages: &[Message]) -> Result<Message> {
let idx = self.calls.fetch_add(1, Ordering::Relaxed);
let delay = self
.delays_ms
.get(idx)
.or_else(|| self.delays_ms.last())
.copied()
.unwrap_or(0);
tokio::time::sleep(Duration::from_millis(delay)).await;
Ok(Message {
role: Role::Assistant,
content: r#"{"content":"ok","confidence":0.7,"triples":[]}"#
.into(),
})
}
fn is_real_llm(&self) -> bool {
true
}
}
fn mk_cluster_with_episode() -> (Cluster, Vec<Episode>) {
let cluster_id = MemoryId::new();
let memory_id = MemoryId::new();
let episode = Episode {
memory_id,
ts_ms: 0,
source_type: "user_message".into(),
content: "hello".into(),
encoding_context: Default::default(),
provenance: None,
confidence: Confidence::new(0.9).unwrap(),
strength: 0.5,
salience: 0.5,
tier: Tier::Hot,
source_id: None,
};
let centroid = Embedding {
data: bytemuck::cast_slice(&[1.0f32, 0.0, 0.0, 0.0]).to_vec(),
dim: 4,
dtype: EmbeddingDtype::F32,
};
let cluster = Cluster {
cluster_id,
episode_ids: vec![memory_id],
centroid: Some(centroid),
coherence: 0.95,
};
(cluster, vec![episode])
}
fn rt() -> tokio::runtime::Runtime {
tokio::runtime::Builder::new_multi_thread()
.worker_threads(2)
.enable_all()
.build()
.unwrap()
}
#[test]
fn extract_triples_batch_continues_after_cluster_timeout() {
let rt = rt();
rt.block_on(async {
let stub = Arc::new(SlowStub::new(vec![50, 3_000, 50]));
let steward = Steward::new(stub.clone(), StewardConfig::default());
let inputs = vec![
mk_cluster_with_episode(),
mk_cluster_with_episode(),
mk_cluster_with_episode(),
];
let outcome = steward
.extract_triples_batch(inputs, Duration::from_millis(200))
.await;
assert_eq!(
outcome.abstractions.len(),
2,
"clusters 1 + 3 must succeed while cluster 2 times out"
);
assert_eq!(
outcome.deferred_count, 1,
"cluster 2 must be counted as deferred (timeout), not failed"
);
assert_eq!(
stub.call_count(),
3,
"every cluster's abstract_cluster must be attempted; \
a timeout on one MUST NOT abort the batch"
);
});
}
#[test]
fn extract_triples_batch_returns_succeeded_count() {
let rt = rt();
rt.block_on(async {
let stub = Arc::new(SlowStub::new(vec![10]));
let steward = Steward::new(stub, StewardConfig::default());
let inputs = vec![
mk_cluster_with_episode(),
mk_cluster_with_episode(),
mk_cluster_with_episode(),
];
let outcome = steward
.extract_triples_batch(inputs, Duration::from_secs(5))
.await;
assert_eq!(outcome.abstractions.len(), 3);
assert_eq!(
outcome.deferred_count, 0,
"no clusters exceeded the timeout"
);
});
}
#[test]
fn extract_triples_batch_failure_distinct_from_timeout() {
let rt = rt();
rt.block_on(async {
let stub = Arc::new(
StubLlmClient::with_canned("garbage-stub", "NOT-JSON")
.pretend_real_llm(true),
);
let steward = Steward::new(stub, StewardConfig::default());
let inputs = vec![mk_cluster_with_episode()];
let outcome = steward
.extract_triples_batch(inputs, Duration::from_secs(5))
.await;
assert_eq!(
outcome.abstractions.len(),
0,
"the parse failure means no abstraction lands"
);
assert_eq!(
outcome.deferred_count, 0,
"an Err return is NOT a deferral — only timeout-elapsed \
counts as deferred. failed clusters are computed by the \
caller as cluster_count - abstractions_built - deferred"
);
});
}
#[test]
fn extract_triples_batch_zero_timeout_disables_per_cluster_timeout() {
let rt = rt();
rt.block_on(async {
let stub = Arc::new(SlowStub::new(vec![100]));
let steward = Steward::new(stub, StewardConfig::default());
let inputs = vec![mk_cluster_with_episode()];
let outcome = steward
.extract_triples_batch(inputs, Duration::ZERO)
.await;
assert_eq!(
outcome.abstractions.len(),
1,
"zero Duration disables the timeout; the cluster must \
succeed even with a delay"
);
assert_eq!(outcome.deferred_count, 0);
});
}
#[test]
fn extract_triples_batch_no_llm_returns_empty_outcome() {
let rt = rt();
rt.block_on(async {
let stub = Arc::new(StubLlmClient::default_stub());
let steward = Steward::new(stub, StewardConfig::default());
let inputs = vec![mk_cluster_with_episode()];
let outcome = steward
.extract_triples_batch(inputs, Duration::from_secs(5))
.await;
assert!(outcome.abstractions.is_empty());
assert_eq!(outcome.deferred_count, 0);
});
}
#[test]
fn extract_triples_batch_uses_passed_timeout() {
let rt = rt();
rt.block_on(async {
let stub = Arc::new(SlowStub::new(vec![50]));
let steward = Steward::new(stub, StewardConfig::default());
let outcome = steward
.extract_triples_batch(
vec![mk_cluster_with_episode()],
Duration::from_secs(1),
)
.await;
assert_eq!(outcome.abstractions.len(), 1);
assert_eq!(outcome.deferred_count, 0);
let stub2 = Arc::new(SlowStub::new(vec![50]));
let steward2 = Steward::new(stub2, StewardConfig::default());
let outcome2 = steward2
.extract_triples_batch(
vec![mk_cluster_with_episode()],
Duration::from_millis(10),
)
.await;
assert_eq!(outcome2.abstractions.len(), 0);
assert_eq!(outcome2.deferred_count, 1);
});
}
}