1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
use futures::channel::mpsc;
use futures::channel::oneshot;
use futures::prelude::*;
use rocket::fs::{relative, FileServer};
use rocket::serde::json::Json;
use rocket::{config::Shutdown, get, post, routes};
use slab::Slab;
use std::path::Path;
use crate::runtime::config;
use crate::runtime::AsyncMessage;
use crate::runtime::Pmt;
fn routes() -> Vec<rocket::Route> {
routes![index, handler_id, handler_id_post]
}
#[get("/")]
fn index(boxes: &rocket::State<Slab<Option<mpsc::Sender<AsyncMessage>>>>) -> String {
format!("number of Blocks {:?}", boxes.len())
}
#[get("/block/<blk>/call/<handler>")]
async fn handler_id(
blk: usize,
handler: usize,
boxes: &rocket::State<Slab<Option<mpsc::Sender<AsyncMessage>>>>,
) -> String {
let mut b = match boxes.get(blk) {
Some(Some(s)) => s.clone(),
_ => return "block not found".to_string(),
};
let (tx, rx) = oneshot::channel::<Pmt>();
b.send(AsyncMessage::Callback {
port_id: handler,
data: Pmt::Null,
tx,
})
.await
.unwrap();
let ret = rx.await.unwrap();
format!("{:?}", ret)
}
#[post("/block/<blk>/call/<handler>", data = "<pmt>")]
async fn handler_id_post(
blk: usize,
handler: usize,
pmt: Json<Pmt>,
boxes: &rocket::State<Slab<Option<mpsc::Sender<AsyncMessage>>>>,
) -> String {
let mut b = match boxes.get(blk) {
Some(Some(s)) => s.clone(),
_ => return "block not found".to_string(),
};
let (tx, rx) = oneshot::channel::<Pmt>();
b.send(AsyncMessage::Callback {
port_id: handler,
data: pmt.into_inner(),
tx,
})
.await
.unwrap();
let ret = rx.await.unwrap();
format!("{:?}", ret)
}
pub fn start_control_port(inboxes: Slab<Option<mpsc::Sender<AsyncMessage>>>) {
if !config::config().ctrlport_enable {
return;
}
let addr = config::config().ctrlport_bind.unwrap();
let mut config = rocket::config::Config::debug_default();
config.address = addr.ip();
config.port = addr.port();
config.shutdown = Shutdown {
ctrlc: false,
force: true,
..Default::default()
};
std::thread::spawn(move || {
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
runtime.block_on(async move {
let mut r = rocket::custom(config)
.manage(inboxes)
.mount("/api/", routes());
if let Some(ref p) = config::config().frontend_path {
r = r.mount("/", FileServer::from(p));
} else if Path::new(relative!("frontend/dist")).is_dir() {
r = r.mount("/", FileServer::from(relative!("frontend/dist")))
}
if let Err(e) = r.launch().await {
info!("rocket server failed to start");
info!("{:?}", e);
}
});
});
}