#[cfg(feature = "integration")]
mod integration {
pub const DEFAULT_NAMESPACE: &str = "dynamo";
use dynamo_runtime::{
DistributedRuntime, Runtime, Worker,
config::environment_names::testing as env_testing,
logging,
pipeline::{
AsyncEngine, AsyncEngineContextProvider, Error, ManyOut, PushRouter, ResponseStream,
SingleIn, async_trait, network::Ingress,
},
protocols::annotated::Annotated,
stream,
};
use anyhow::{Context, Result};
use futures::StreamExt;
use std::{
sync::Arc,
sync::atomic::{AtomicU64, Ordering},
time::Duration,
};
use tokio::time::Instant;
#[test]
fn main() -> Result<()> {
logging::init();
let worker = Worker::from_settings()?;
worker.execute(app)
}
async fn app(runtime: Runtime) -> Result<()> {
let distributed = DistributedRuntime::from_settings(runtime.clone()).await?;
let server = tokio::spawn(backend(distributed.clone()));
let client = tokio::spawn(client(distributed.clone()));
client.await??;
distributed.shutdown();
let handler = server.await??;
let final_count = handler.backend_counter.load(Ordering::Relaxed);
println!(
"Final RequestHandler backend_counter: {} requests processed",
final_count
);
Ok(())
}
struct RequestHandler {
backend_counter: AtomicU64,
queued_up_processing: bool,
}
impl RequestHandler {
fn new(queued_up_processing: bool) -> Arc<Self> {
Arc::new(Self {
backend_counter: AtomicU64::new(0),
queued_up_processing,
})
}
}
#[async_trait]
impl AsyncEngine<SingleIn<String>, ManyOut<Annotated<String>>, Error> for RequestHandler {
async fn generate(&self, input: SingleIn<String>) -> Result<ManyOut<Annotated<String>>> {
let (data, ctx) = input.into_parts();
self.backend_counter.fetch_add(1, Ordering::Relaxed);
let chars = data
.chars()
.map(|c| Annotated::from_data(c.to_string()))
.collect::<Vec<_>>();
if self.queued_up_processing {
let async_stream = async_stream::stream! {
for c in chars {
yield c;
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
}
};
Ok(ResponseStream::new(Box::pin(async_stream), ctx.context()))
} else {
let iter_stream = stream::iter(chars);
Ok(ResponseStream::new(Box::pin(iter_stream), ctx.context()))
}
}
}
async fn backend(runtime: DistributedRuntime) -> Result<Arc<RequestHandler>> {
let queued_up_processing =
std::env::var(env_testing::DYN_QUEUED_UP_PROCESSING).unwrap_or("false".to_string());
let queued_up_processing: bool = queued_up_processing.parse().unwrap_or(false);
let handler = RequestHandler::new(queued_up_processing);
let ingress = Ingress::for_engine(handler.clone())?;
let component = runtime.namespace(DEFAULT_NAMESPACE)?.component("backend")?;
component
.endpoint("generate")
.endpoint_builder()
.handler(ingress)
.start()
.await?;
Ok(handler)
}
async fn client(runtime: DistributedRuntime) -> Result<()> {
let run_duration =
std::env::var(env_testing::DYN_SOAK_RUN_DURATION).unwrap_or("3s".to_string());
let run_duration =
humantime::parse_duration(&run_duration).unwrap_or(Duration::from_secs(3));
let batch_load =
std::env::var(env_testing::DYN_SOAK_BATCH_LOAD).unwrap_or("100".to_string());
let batch_load: usize = batch_load.parse().unwrap_or(100);
let client = runtime
.namespace(DEFAULT_NAMESPACE)?
.component("backend")?
.endpoint("generate")
.client()
.await?;
client.wait_for_instances().await?;
let router =
PushRouter::<String, Annotated<String>>::from_client(client, Default::default())
.await?;
let router = Arc::new(router);
let start = Instant::now();
let mut count = 0;
loop {
let mut tasks = Vec::new();
for _ in 0..batch_load {
let router = router.clone();
tasks.push(tokio::spawn(async move {
let mut stream = tokio::time::timeout(
Duration::from_secs(5),
router.random("hello world".to_string().into()),
)
.await
.context("request timed out")??;
while let Some(_resp) =
tokio::time::timeout(Duration::from_secs(30), stream.next())
.await
.context("stream timed out")?
{}
Ok::<(), Error>(())
}));
}
for task in tasks.into_iter() {
task.await??;
}
let elapsed = start.elapsed();
count += batch_load;
if count % 1000 == 0 {
println!("elapsed: {:?}; count: {}", elapsed, count);
}
if elapsed > run_duration {
println!("done");
break;
}
}
Ok(())
}
}