#![allow(dead_code)]
use anyhow::Error;
use futures::{StreamExt, stream};
use serde::{Deserialize, Serialize};
use std::{sync::Arc, time::Duration};
use dynamo_runtime::engine::ResponseStream;
use dynamo_runtime::pipeline::{
AsyncEngine,
Data,
Event,
ManyOut,
Operator,
ServiceBackend,
ServiceEngine,
ServiceFrontend,
SingleIn,
*, };
mod common;
use common::engines::{AsyncGenerator, LlmdbaEngine as LambdaEngine};
use common::mock;
#[derive(Serialize, Deserialize, Debug, Clone)]
pub enum Annotated<T: Data> {
Data(T),
Event(Event),
Comment(String),
Error(String),
End,
}
struct PreprocesOperator {}
#[async_trait::async_trait]
impl
Operator<
SingleIn<String>,
ManyOut<Annotated<String>>,
SingleIn<String>,
ManyOut<Annotated<String>>,
> for PreprocesOperator
{
async fn generate(
&self,
req: SingleIn<String>,
next: Arc<dyn AsyncEngine<SingleIn<String>, ManyOut<Annotated<String>>, Error>>,
) -> Result<ManyOut<Annotated<String>>, Error> {
let prepend = vec![Annotated::<String>::Comment(format!(
"PreprocessOperator: {:?}",
req
))];
let prepend_stream = stream::iter(prepend);
let req = req.map(|x| format!("{} from operator", x));
let stream = next.generate(req).await?;
let ctx = stream.context();
Ok(ResponseStream::new(
Box::pin(prepend_stream.chain(stream)),
ctx,
))
}
}
fn make_backend_engine() -> ServiceEngine<SingleIn<String>, ManyOut<Annotated<String>>> {
LambdaEngine::from_generator(AsyncGenerator::<String, Annotated<String>>::new(
|(req, stream)| async move {
let chars = req.chars().collect::<Vec<char>>();
for c in chars {
match stream.emit(Annotated::Data(c.to_string())).await {
Ok(_) => {}
Err(_) => return,
}
tokio::time::sleep(Duration::from_millis(10)).await;
}
},
))
}
#[tokio::test]
async fn test_service_source_sink() {
let source = ServiceFrontend::<SingleIn<String>, ManyOut<Annotated<String>>>::new();
let sink = ServiceBackend::from_engine(make_backend_engine());
let service = source.link(sink).unwrap().link(source).unwrap();
let mut stream = service.generate("test".to_string().into()).await.unwrap();
let mut counter = 0;
while let Some(_output) = stream.next().await {
counter += 1;
}
assert_eq!(counter, 4);
}
fn make_preprocessor() -> Arc<PipelineNode<SingleIn<String>, SingleIn<String>>> {
PipelineNode::<SingleIn<String>, SingleIn<String>>::new(Box::new(|req| {
Ok(req.map(|x| format!("{} world", x)))
}))
}
#[allow(clippy::type_complexity)]
fn make_postprocessor() -> Arc<PipelineNode<ManyOut<Annotated<String>>, ManyOut<Annotated<String>>>>
{
PipelineNode::<ManyOut<Annotated<String>>, ManyOut<Annotated<String>>>::new(Box::new(|req| {
let ctx = req.context();
let double_stream = req.flat_map(|x| {
let x1 = x.clone();
let x2 = x;
stream::iter(vec![x1, x2])
});
Ok(ResponseStream::new(Box::pin(double_stream), ctx))
}))
}
fn make_service()
-> Result<ServiceEngine<SingleIn<String>, ManyOut<Annotated<String>>>, PipelineError> {
let frontend = ServiceFrontend::<SingleIn<String>, ManyOut<Annotated<String>>>::new();
let preprocess = make_preprocessor();
let postprocess = make_postprocessor();
let backend = ServiceBackend::from_engine(make_backend_engine());
let service = frontend
.link(preprocess)?
.link(backend)?
.link(postprocess)?
.link(frontend)?;
Ok(service)
}
#[tokio::test]
async fn test_service_source_node_sink() {
let service = make_service().unwrap();
let mut stream = service.generate("test".to_string().into()).await.unwrap();
let mut counter = 0;
while let Some(_output) = stream.next().await {
counter += 1;
}
assert_eq!(counter, 20);
}
#[tokio::test]
#[ignore = "Blocked by AsyncEngineStream trait missing Sync supertrait"]
#[expect(unused_variables)]
async fn test_disaggregated_service() {
println!("Running test_disaggregated_service");
let frontend = ServiceFrontend::<SingleIn<String>, ManyOut<Annotated<String>>>::new();
let postprocessor = make_postprocessor();
let end_node_0 = SegmentSink::<SingleIn<String>, ManyOut<Annotated<String>>>::new();
let node0_service = frontend
.link(end_node_0.clone())
.unwrap()
.link(postprocessor)
.unwrap()
.link(frontend)
.unwrap();
let start_node1 = SegmentSource::<SingleIn<String>, ManyOut<Annotated<String>>>::new();
let preprocessor = make_preprocessor();
let backend = ServiceBackend::from_engine(make_backend_engine());
let node1_service = start_node1
.link(preprocessor)
.unwrap()
.link(backend)
.unwrap()
.link(start_node1.clone())
.unwrap();
let opts = mock::MockNetworkOptions::default();
let (egress, ingress) = mock::MockNetworkTransport::<
SingleIn<String>,
ManyOut<Annotated<String>>,
>::new_egress_ingress(opts);
println!(
"Test blocked: SegmentSink::attach requires Arc<dyn AsyncEngine> but AsyncEngineStream cannot be Sync"
);
}
fn make_service_with_operator()
-> Result<ServiceEngine<SingleIn<String>, ManyOut<Annotated<String>>>, PipelineError> {
let frontend = ServiceFrontend::<SingleIn<String>, ManyOut<Annotated<String>>>::new();
let preprocess = make_preprocessor();
let postprocess = make_postprocessor();
let backend = ServiceBackend::from_engine(make_backend_engine());
let operator = PipelineOperator::new(Arc::new(PreprocesOperator {}));
let service = frontend
.link(preprocess)?
.link(operator.forward_edge())?
.link(backend)?
.link(postprocess)?
.link(operator.backward_edge())?
.link(frontend)?;
Ok(service)
}
#[tokio::test]
async fn test_service_source_node_sink_with_operator() {
let service = make_service_with_operator().unwrap();
let mut stream = service.generate("test".to_string().into()).await.unwrap();
let mut counter = 0;
let mut annotations_counter = 0;
while let Some(output) = stream.next().await {
match output {
Annotated::Data(_) => counter += 1,
Annotated::Comment(_) => annotations_counter += 1,
_ => {}
}
}
assert_eq!(annotations_counter, 1);
assert_eq!(counter, 48);
}