#![cfg(all(feature = "embeddings", feature = "openai-embeddings"))]
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::task::JoinSet;
use vecstore::embeddings::openai_backend::{OpenAIEmbedding, OpenAIModel};
async fn create_embedder(model: OpenAIModel) -> OpenAIEmbedding {
OpenAIEmbedding::new("test-api-key".to_string(), model)
.await
.unwrap()
}
#[tokio::test]
async fn test_concurrent_cost_estimation() {
let embedder = Arc::new(create_embedder(OpenAIModel::TextEmbedding3Small).await);
let mut handles = vec![];
for i in 0..100 {
let embedder_clone = Arc::clone(&embedder);
let handle = tokio::spawn(async move {
let text = format!("Test text {}", i);
embedder_clone.estimate_cost(&[text.as_str()])
});
handles.push(handle);
}
let mut total_cost = 0.0;
for handle in handles {
let cost = handle.await.unwrap();
assert!(cost >= 0.0, "Cost should be non-negative");
assert!(cost.is_finite(), "Cost should be finite");
total_cost += cost;
}
assert!(total_cost > 0.0, "Total cost should be positive");
assert!(total_cost < 1.0, "Total cost should be reasonable");
}
#[tokio::test]
async fn test_concurrent_model_access() {
let embedder = Arc::new(create_embedder(OpenAIModel::TextEmbedding3Small).await);
let mut handles = vec![];
for _ in 0..1000 {
let embedder_clone = Arc::clone(&embedder);
let handle = tokio::spawn(async move {
let dimension = embedder_clone.model().dimension();
let cost = embedder_clone.model().cost_per_million_tokens();
let name = embedder_clone.model().as_str();
(dimension, cost, name)
});
handles.push(handle);
}
for handle in handles {
let (dimension, cost, name) = handle.await.unwrap();
assert_eq!(dimension, 1536);
assert_eq!(cost, 0.02);
assert_eq!(name, "text-embedding-3-small");
}
}
#[tokio::test]
async fn test_concurrent_builder_pattern() {
let mut handles = vec![];
for i in 0..50 {
let handle = tokio::spawn(async move {
let embedder = create_embedder(OpenAIModel::TextEmbedding3Small)
.await
.with_rate_limit(100 + i)
.with_max_retries(i % 10);
embedder.estimate_cost(&["test"])
});
handles.push(handle);
}
for handle in handles {
let cost = handle.await.unwrap();
assert!(cost >= 0.0);
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn test_rate_limiter_concurrent_requests() {
let embedder = Arc::new(
create_embedder(OpenAIModel::TextEmbedding3Small)
.await
.with_rate_limit(10), );
let start = Instant::now();
let mut handles = vec![];
for i in 0..20 {
let embedder_clone = Arc::clone(&embedder);
let handle = tokio::spawn(async move {
let text = format!("Request {}", i);
embedder_clone.estimate_cost(&[text.as_str()])
});
handles.push(handle);
}
for handle in handles {
let cost = handle.await.unwrap();
assert!(cost >= 0.0);
}
let elapsed = start.elapsed();
assert!(
elapsed < Duration::from_secs(5),
"Should complete reasonably fast for cost estimation"
);
}
#[tokio::test]
async fn test_concurrent_different_models() {
let small = Arc::new(create_embedder(OpenAIModel::TextEmbedding3Small).await);
let large = Arc::new(create_embedder(OpenAIModel::TextEmbedding3Large).await);
let ada = Arc::new(create_embedder(OpenAIModel::Ada002).await);
let mut handles = vec![];
for i in 0..30 {
let embedder = match i % 3 {
0 => Arc::clone(&small),
1 => Arc::clone(&large),
_ => Arc::clone(&ada),
};
let handle = tokio::spawn(async move {
let text = format!("Text {}", i);
embedder.estimate_cost(&[text.as_str()])
});
handles.push(handle);
}
let mut costs = vec![];
for handle in handles {
costs.push(handle.await.unwrap());
}
for cost in costs {
assert!(cost >= 0.0);
assert!(cost.is_finite());
}
}
#[tokio::test]
async fn test_concurrent_batch_cost_estimation() {
let embedder = Arc::new(create_embedder(OpenAIModel::TextEmbedding3Small).await);
let mut handles = vec![];
for batch_size in [1, 10, 100, 1000].iter() {
for _ in 0..10 {
let embedder_clone = Arc::clone(&embedder);
let size = *batch_size;
let handle = tokio::spawn(async move {
let texts: Vec<String> = (0..size).map(|i| format!("Text {}", i)).collect();
let text_refs: Vec<&str> = texts.iter().map(|s| s.as_str()).collect();
embedder_clone.estimate_cost(&text_refs)
});
handles.push(handle);
}
}
for handle in handles {
let cost = handle.await.unwrap();
assert!(cost >= 0.0);
assert!(cost.is_finite());
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 8)]
async fn test_high_concurrency_stress() {
let embedder = Arc::new(create_embedder(OpenAIModel::TextEmbedding3Small).await);
let mut set = JoinSet::new();
for i in 0..1000 {
let embedder_clone = Arc::clone(&embedder);
set.spawn(async move {
let text = format!("Stress test {}", i);
embedder_clone.estimate_cost(&[text.as_str()])
});
}
let mut count = 0;
while let Some(result) = set.join_next().await {
let cost = result.unwrap();
assert!(cost >= 0.0);
assert!(cost.is_finite());
count += 1;
}
assert_eq!(count, 1000, "All tasks should complete");
}
#[tokio::test]
async fn test_concurrent_mixed_operations() {
let embedder = Arc::new(create_embedder(OpenAIModel::TextEmbedding3Small).await);
let mut handles = vec![];
for i in 0..100 {
let embedder_clone = Arc::clone(&embedder);
let handle = tokio::spawn(async move {
match i % 4 {
0 => {
embedder_clone.estimate_cost(&["test"])
}
1 => {
let _ = embedder_clone.model().dimension();
0.0
}
2 => {
let texts = vec!["a", "b", "c"];
embedder_clone.estimate_cost(&texts)
}
_ => {
embedder_clone.model().cost_per_million_tokens()
}
}
});
handles.push(handle);
}
for handle in handles {
let result = handle.await.unwrap();
assert!(result >= 0.0);
}
}
#[tokio::test]
async fn test_concurrent_empty_batch() {
let embedder = Arc::new(create_embedder(OpenAIModel::TextEmbedding3Small).await);
let mut handles = vec![];
for _ in 0..100 {
let embedder_clone = Arc::clone(&embedder);
let handle = tokio::spawn(async move {
let empty: Vec<&str> = vec![];
embedder_clone.estimate_cost(&empty)
});
handles.push(handle);
}
for handle in handles {
let cost = handle.await.unwrap();
assert_eq!(cost, 0.0, "Empty batch should have zero cost");
}
}
#[tokio::test]
async fn test_concurrent_large_batches() {
let embedder = Arc::new(create_embedder(OpenAIModel::TextEmbedding3Small).await);
let mut handles = vec![];
for _ in 0..20 {
let embedder_clone = Arc::clone(&embedder);
let handle = tokio::spawn(async move {
let texts: Vec<&str> = vec!["sample"; 5000];
embedder_clone.estimate_cost(&texts)
});
handles.push(handle);
}
let start = Instant::now();
for handle in handles {
let cost = handle.await.unwrap();
assert!(cost > 0.0);
assert!(cost.is_finite());
}
let elapsed = start.elapsed();
assert!(
elapsed < Duration::from_secs(10),
"Should complete in reasonable time"
);
}
#[tokio::test]
async fn test_no_data_races() {
let embedder = Arc::new(create_embedder(OpenAIModel::TextEmbedding3Small).await);
let mut handles = vec![];
for i in 0..500 {
let embedder_clone = Arc::clone(&embedder);
let handle = tokio::spawn(async move {
let text = format!("Text {}", i);
let cost1 = embedder_clone.estimate_cost(&[text.as_str()]);
let cost2 = embedder_clone.estimate_cost(&[text.as_str()]);
assert_eq!(cost1, cost2, "Cost estimation should be deterministic");
cost1
});
handles.push(handle);
}
for handle in handles {
let cost = handle.await.unwrap();
assert!(cost >= 0.0);
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 16)]
async fn test_extreme_concurrency() {
let embedder = Arc::new(create_embedder(OpenAIModel::TextEmbedding3Small).await);
let mut set = JoinSet::new();
for i in 0..5000 {
let embedder_clone = Arc::clone(&embedder);
set.spawn(async move {
let text = format!("Extreme test {}", i);
embedder_clone.estimate_cost(&[text.as_str()])
});
}
let start = Instant::now();
let mut count = 0;
while let Some(result) = set.join_next().await {
let cost = result.unwrap();
assert!(cost >= 0.0);
count += 1;
}
let elapsed = start.elapsed();
assert_eq!(count, 5000);
println!("Extreme concurrency test completed in {:?}", elapsed);
assert!(
elapsed < Duration::from_secs(30),
"Should complete in reasonable time"
);
}
#[cfg(test)]
mod concurrency_test_summary {
}