#[path = "../common/mod.rs"] mod common;
use common::*;
use reactive_mutiny::prelude::advanced::*;
use std::{
sync::Arc,
time::Duration,
fmt::Debug,
};
use futures::{Stream, stream, StreamExt};
#[derive(Debug, defaults::Defaults)]
#[def = "Buy(Order::default())"]
enum OrderEvent {
Buy(Order),
Sell(Order),
}
#[derive(Debug, Default)]
struct Order {
quantity: u32,
}
const BUFFER_SIZE: usize = 1024;
const MAX_STREAMS: usize = 16;
type MultiStreamType = MutinyStream<'static, OrderEvent, ChannelMultiArcAtomic<OrderEvent, BUFFER_SIZE, MAX_STREAMS>, Arc<OrderEvent>>;
struct DecisionMaker {
orders_event_handler: MultiAtomicArc<OrderEvent, BUFFER_SIZE, MAX_STREAMS>,
}
impl DecisionMaker {
pub fn new() -> Self {
Self {
orders_event_handler: Multi::new("Order events Handler"),
}
}
pub async fn add_listener<IntoString: Into<String>,
OutItemType: Send + Debug,
OutStreamType: Stream<Item=OutItemType> + Send + 'static>
(&self,
listener_name: IntoString,
pipeline_builder: impl FnOnce(MultiStreamType) -> OutStreamType)
-> Result<(), Box<dyn std::error::Error>> {
self.orders_event_handler.spawn_non_futures_non_fallible_executor(1, format!("`OrderEvent`s listener '{}'", listener_name.into()), pipeline_builder, |_| async {}).await
.map_err(|err| Box::from(format!("Error adding an `OrderEvent`s listener to the `DecisionMaker`: {:?}", err)))
.map(|_| ())
}
fn decider<'a>(&'a mut self, analysis_events_stream: impl Stream<Item=AnalysisEvent> + 'a) -> impl Stream<Item=()> + 'a {
let mut positions = 0;
analysis_events_stream.map(move |analysis| {
let order = if positions == 0 && analysis.price_delta > 0.00 {
let quantity = 100;
positions += quantity;
Some(OrderEvent::Buy(Order{quantity: quantity}))
} else if positions > 0 && analysis.price_delta < 0.00 {
let quantity = positions;
positions = 0;
Some(OrderEvent::Sell(Order{quantity: quantity}))
} else {
None
};
if let Some(order) = order {
self.orders_event_handler.send_with(|slot| *slot = order)
.retry_with(|setter| self.orders_event_handler.send_with(setter))
.spinning_forever();
}
()
})
}
pub async fn close(&self) {
self.orders_event_handler.close(Duration::ZERO).await;
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut decision_maker = DecisionMaker::new();
decision_maker.add_listener("Sender",
|order_stream| order_stream
.inspect(|order| println!(">>> sending to the Exchange: {:?}", order)
)).await?;
decision_maker.add_listener("Logger",
|state_msgs_stream| state_msgs_stream
.inspect(|order| println!("### saving to the storage: {:?}", order)
)).await?;
let stream_of_incoming_events = stream::iter([
AnalysisEvent { price_delta: 0.01 }, AnalysisEvent { price_delta: 0.02 },
AnalysisEvent { price_delta: 0.03 },
AnalysisEvent { price_delta: -0.01 }, AnalysisEvent { price_delta: -0.02 },
AnalysisEvent { price_delta: 0.01 }, AnalysisEvent { price_delta: -0.01 }, ]);
decision_maker.decider(stream_of_incoming_events)
.for_each(|_| async {
println!("Processed another incoming `AnalysisEvent`...");
}).await;
decision_maker.close().await;
Ok(())
}