use super::*;
use crate::core::embed::MockEmbedder;
use std::time::Duration;
fn make_pool(workers: usize) -> EmbedPool {
let embedder: Arc<dyn Embedder> = Arc::new(MockEmbedder::new(384));
EmbedPool::new(workers, embedder)
}
#[tokio::test]
async fn embed_returns_vector_per_text() {
let pool = make_pool(2);
let out = pool
.embed(
vec!["hello".into(), "world".into()],
RequestPriority::Interactive,
)
.await
.expect("embed succeeds");
assert_eq!(out.len(), 2);
assert_eq!(out[0].len(), 384);
}
#[tokio::test]
async fn embed_handles_empty_input() {
let pool = make_pool(1);
let out = pool
.embed(vec![], RequestPriority::Background)
.await
.expect("empty embed is a no-op");
assert!(out.is_empty());
}
#[tokio::test]
async fn pool_creates_n_workers() {
let pool = make_pool(3);
assert_eq!(pool.workers(), 3);
}
#[tokio::test]
#[serial_test::serial(env_workers)]
async fn autotune_worker_count_matches_table() {
std::env::remove_var("TRUSTY_EMBED_WORKERS");
let n = autotune_workers();
assert!(
n == 1 || n == 2 || n == 4,
"autotune returned unexpected count: {n}"
);
}
#[tokio::test]
#[serial_test::serial(env_workers)]
async fn pool_autotune_respects_env_override() {
std::env::set_var("TRUSTY_EMBED_WORKERS", "7");
let n = autotune_workers();
std::env::remove_var("TRUSTY_EMBED_WORKERS");
assert_eq!(n, 7);
}
#[tokio::test]
async fn priority_ordering_interactive_drains_first() {
let pool = make_pool(1);
let interactive = pool
.embed(vec!["i".into()], RequestPriority::Interactive)
.await
.expect("interactive embed succeeds");
let background = pool
.embed(vec!["b".into()], RequestPriority::Background)
.await
.expect("background embed succeeds");
assert_eq!(interactive.len(), 1);
assert_eq!(background.len(), 1);
}
#[tokio::test]
async fn dropping_pool_shuts_workers_down() {
let pool = make_pool(1);
drop(pool);
tokio::time::sleep(Duration::from_millis(50)).await;
}
#[tokio::test]
async fn dropping_pool_after_send_returns_error() {
let pool = make_pool(1);
pool.embed(vec!["warmup".into()], RequestPriority::Interactive)
.await
.expect("warmup embed on live pool must succeed");
let (interactive_tx, interactive_rx) = mpsc::channel::<EmbedRequest>(1);
drop(interactive_rx);
let (_background_tx, _background_rx) = mpsc::channel::<EmbedRequest>(1);
drop(_background_rx);
let closed_pool = EmbedPool {
interactive_tx,
background_tx: _background_tx,
workers: 0,
in_flight: Arc::new(AtomicUsize::new(0)),
_worker_threads: vec![],
stall_tracker: None,
};
let result = closed_pool
.embed(vec!["x".into()], RequestPriority::Interactive)
.await;
assert!(
result.is_err(),
"embed on a closed pool must return Err, not hang"
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn embed_pool_isolation_concurrent_task_not_blocked() {
use std::sync::atomic::AtomicBool;
struct SlowEmbedder {
dim: usize,
delay: Duration,
}
#[async_trait::async_trait]
impl crate::core::embed::Embedder for SlowEmbedder {
async fn embed(&self, _text: &str) -> anyhow::Result<Vec<f32>> {
tokio::time::sleep(self.delay).await;
Ok(vec![0.1f32; self.dim])
}
async fn embed_batch(&self, texts: &[&str]) -> anyhow::Result<Vec<Vec<f32>>> {
tokio::time::sleep(self.delay).await;
Ok(texts.iter().map(|_| vec![0.1f32; self.dim]).collect())
}
fn dimension(&self) -> usize {
self.dim
}
}
let embedder: Arc<dyn Embedder> = Arc::new(SlowEmbedder {
dim: 8,
delay: Duration::from_millis(400),
});
let pool = Arc::new(EmbedPool::new(1, embedder));
let timer_done = Arc::new(AtomicBool::new(false));
let timer_done_clone = Arc::clone(&timer_done);
let timer_handle = tokio::spawn(async move {
tokio::time::sleep(Duration::from_millis(100)).await;
timer_done_clone.store(true, Ordering::SeqCst);
});
let pool_clone = Arc::clone(&pool);
let embed_handle = tokio::spawn(async move {
pool_clone
.embed(vec!["slow".into()], RequestPriority::Background)
.await
.expect("slow embed should succeed")
});
let timer_start = std::time::Instant::now();
timer_handle.await.expect("timer task should not panic");
let timer_elapsed = timer_start.elapsed();
assert!(
timer_elapsed < Duration::from_millis(300),
"Timer task took {:?} — embed worker should be isolated on dedicated \
OS thread and not block the caller's scheduler (issue #1017 fix)",
timer_elapsed
);
assert!(
timer_done.load(Ordering::SeqCst),
"Timer flag was not set — task did not complete before assertion"
);
let result = embed_handle.await.expect("embed task should not panic");
assert_eq!(result.len(), 1);
assert_eq!(result[0].len(), 8);
}