1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
use futures::channel::mpsc;
use futures::channel::oneshot;
use futures::prelude::*;
use rocket::fs::{relative, FileServer};
use rocket::serde::json::Json;
use rocket::{config::Shutdown, get, post, routes};
use slab::Slab;
use std::path::Path;

use crate::runtime::config;
use crate::runtime::AsyncMessage;
use crate::runtime::Pmt;

fn routes() -> Vec<rocket::Route> {
    routes![index, handler_id, handler_id_post]
}

#[get("/")]
fn index(boxes: &rocket::State<Slab<Option<mpsc::Sender<AsyncMessage>>>>) -> String {
    format!("number of Blocks {:?}", boxes.len())
}

#[get("/block/<blk>/call/<handler>")]
async fn handler_id(
    blk: usize,
    handler: usize,
    boxes: &rocket::State<Slab<Option<mpsc::Sender<AsyncMessage>>>>,
) -> String {
    let mut b = match boxes.get(blk) {
        Some(Some(s)) => s.clone(),
        _ => return "block not found".to_string(),
    };

    let (tx, rx) = oneshot::channel::<Pmt>();

    b.send(AsyncMessage::Callback {
        port_id: handler,
        data: Pmt::Null,
        tx,
    })
    .await
    .unwrap();

    let ret = rx.await.unwrap();

    format!("{:?}", ret)
}

#[post("/block/<blk>/call/<handler>", data = "<pmt>")]
async fn handler_id_post(
    blk: usize,
    handler: usize,
    pmt: Json<Pmt>,
    boxes: &rocket::State<Slab<Option<mpsc::Sender<AsyncMessage>>>>,
) -> String {
    let mut b = match boxes.get(blk) {
        Some(Some(s)) => s.clone(),
        _ => return "block not found".to_string(),
    };

    let (tx, rx) = oneshot::channel::<Pmt>();

    b.send(AsyncMessage::Callback {
        port_id: handler,
        data: pmt.into_inner(),
        tx,
    })
    .await
    .unwrap();

    let ret = rx.await.unwrap();

    format!("{:?}", ret)
}

pub fn start_control_port(inboxes: Slab<Option<mpsc::Sender<AsyncMessage>>>) {
    if !config::config().ctrlport_enable {
        return;
    }

    let addr = config::config().ctrlport_bind.unwrap();

    let mut config = rocket::config::Config::debug_default();
    config.address = addr.ip();
    config.port = addr.port();
    config.shutdown = Shutdown {
        ctrlc: false,
        force: true,
        ..Default::default()
    };

    std::thread::spawn(move || {
        let runtime = tokio::runtime::Builder::new_current_thread()
            .enable_all()
            .build()
            .unwrap();

        runtime.block_on(async move {
            let mut r = rocket::custom(config)
                .manage(inboxes)
                .mount("/api/", routes());

            if let Some(ref p) = config::config().frontend_path {
                r = r.mount("/", FileServer::from(p));
            } else if Path::new(relative!("frontend/dist")).is_dir() {
                r = r.mount("/", FileServer::from(relative!("frontend/dist")))
            }

            if let Err(e) = r.launch().await {
                info!("rocket server failed to start");
                info!("{:?}", e);
            }
        });
    });
}