1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
pub mod builder;
pub mod messages;
use std::{future::Future, pin::Pin};
use tokio::{
net::{tcp::OwnedReadHalf, ToSocketAddrs},
runtime::Runtime,
sync::{broadcast, mpsc},
};
use crate::{
io::{
tcp::TcpListener, Component, ComponentFuture, ComponentReader, DataFrameReceiver, IoRead,
},
Actor, Ask, AsyncAsk, TcpRequest,
};
pub struct World {
rt: Runtime,
inputs: Vec<FnOnceLoop>,
}
impl From<Runtime> for World {
fn from(value: Runtime) -> Self {
Self {
rt: value,
inputs: Vec::new(),
}
}
}
pub type WorldResult = Result<(), Box<dyn std::error::Error + Send + Sync>>;
type FnOnceLoopResult = Pin<Box<dyn Future<Output = WorldResult> + Send + Sync>>;
type FnOnceLoop = Box<dyn FnOnce(broadcast::Receiver<()>) -> FnOnceLoopResult + Send + Sync>;
impl World {
pub fn new() -> std::io::Result<World> {
let rt = Runtime::new()?;
Ok(World { rt, inputs: vec![] })
}
pub fn with_state<Fut: Future>(&self, f: Fut) -> Fut::Output
where {
self.rt.block_on(f)
}
pub fn tcp_component<A, O>(
&self,
address: impl ToSocketAddrs,
actor: impl Actor + Ask<TcpRequest, Result = A>,
) -> std::io::Result<impl Component>
where
A: Actor + AsyncAsk<O::Request>,
O: DataFrameReceiver,
O::Frame: IoRead<OwnedReadHalf>,
{
self.rt
.block_on(TcpListener::<_, A, O::Frame, O>::new(address, actor))
}
pub fn on_input<C: Component + 'static>(&mut self, mut component: C) {
let event_loop: FnOnceLoop = Box::new(move |mut shutdown: broadcast::Receiver<()>| {
Box::pin(async move {
loop {
tokio::select! {
result = component.accept() => {
let (reader, actor) = result.unwrap();
let connection = actor.start();
tokio::spawn(async move {
loop {
match reader.read().await {
Ok(Some(payload)) => {
let _ = connection.async_ask(payload).await;
}
Ok(None) => {
// we can no longer read from the socket
tracing::info!("Disconnected from tcp read pipe");
break;
}
Err(err) => {
tracing::error!(error = err.to_string(), "Disconnected from tcp read pipe");
break;
}
}
}
if let Err(err) = connection.await {
match err {
crate::address::IntoFutureError::MailboxClosed => tracing::trace!(actor = <C as ComponentFuture>::Actor::name(), "already closed"),
crate::address::IntoFutureError::Paniced => tracing::error!(actor = <C as ComponentFuture>::Actor::name(), "paniced during close"),
}
}
});
}
_ = shutdown.recv() => {
break;
}
}
}
component.shutdown().await;
Ok(())
})
});
self.inputs.push(event_loop);
}
// pub fn with_terminal_input<Act: Actor + Handler<Stdin>>(mut actor: Act) -> Self {}
// pub fn on_input<I, Input>(&self, machine: I, callback: F)
// where
// I: FnOnce(),
// F: Fn(Input, State),
// {
// let _state = self.state.clone();
// self.rt.spawn(async move {
// let state = _state;
// machine.handle(state);
// loop {
// }
// })
// }
/// Block the current runtime until one of the possible senarios complete:
///
/// 1. All inputs are turned off; thus we completed executing the program
/// 2. Ctrl+C is selected, which we will communicate with the actor system to start cleaning up and completing tasks
/// 3. Ctrl+C (times 2): We will not wait for the actor system to clean up and will forcefully exit the system
///
/// Upon exiting, the current inputs will be turned off (if they aren't already)
/// and the runtime will be shutdown.
pub fn block_until_completion(self) {
self.rt.block_on(async move {
let shutdown = tokio::signal::ctrl_c();
let (notify_shutdown, _) = broadcast::channel::<()>(1);
let (shutdown_complete_tx, mut shutdown_complete_rx) = mpsc::channel::<()>(1);
let mut inputs = vec![];
for input in self.inputs {
let reciever = notify_shutdown.subscribe();
inputs.push(tokio::spawn(async move {
if let Err(err) = input(reciever).await {
tracing::error!(error = err.to_string(), "Failed to accept input");
}
}));
}
tokio::spawn(async move {
let _ = shutdown.await;
drop(notify_shutdown);
});
for input in inputs {
let _ = input.await;
}
drop(shutdown_complete_tx);
let _ = shutdown_complete_rx.recv().await;
});
}
}