use std::collections::HashMap;
use std::time::Duration;
use std::time::Instant;
use dag_flow::context::Context;
use dag_flow::engine::Engine;
use dag_flow::task::Input;
use dag_flow::task::Task;
use futures::executor;
use futures::future::OptionFuture;
use futures_timer::Delay;
type Bytes = Vec<u8>;
fn main() {
let builder = Engine::builder();
builder.add_task(A).add_task(B).add_task(C);
let engine = builder.build().unwrap();
let context = Context::new();
let now = Instant::now();
executor::block_on(engine.run(context.clone()));
let elapsed = now.elapsed().as_secs();
assert!(elapsed == 2 || elapsed == 3);
assert_eq!(
executor::block_on(async {
OptionFuture::from(context.get(&"C".into()))
.await
.unwrap_or_default()
}),
Some("C's output".into())
)
}
struct A;
impl Task<String, Bytes> for A {
fn id(&self) -> String {
"A".into()
}
async fn run(&self, _: HashMap<String, Input<'_, Bytes>>) -> Option<Bytes> {
Delay::new(Duration::from_secs(1)).await;
Some("A's output".into())
}
}
struct B;
impl Task<String, Bytes> for B {
fn id(&self) -> String {
"B".into()
}
fn is_auto(&self) -> bool {
false
}
async fn run(&self, _: HashMap<String, Input<'_, Bytes>>) -> Option<Bytes> {
Delay::new(Duration::from_secs(3)).await;
Some("B's output".into())
}
}
struct C;
impl Task<String, Bytes> for C {
fn id(&self) -> String {
"C".into()
}
fn dependencies(&self) -> Vec<String> {
vec!["A".into(), "B".into()]
}
async fn run(&self, inputs: HashMap<String, Input<'_, Bytes>>) -> Option<Bytes> {
futures::join!(
async {
let _output_a = inputs["A"].clone().await;
Delay::new(Duration::from_secs(1)).await;
},
async {
if rand::random_bool(0.5) {
let _output_b = inputs["B"].clone().await;
}
}
);
Some("C's output".into())
}
}