1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
mod routes;

use crate::runtime::blackboard::BlackBoard;
use crate::runtime::builder::ServerPort;
use crate::tracer::{Event, Tracer};
use axum::routing::{get, post};
use axum::{Json, Router};

use crate::runtime::forester::serv::routes::*;
use crate::runtime::{RtOk, RtResult, RuntimeError};
use axum::http::StatusCode;
use axum::response::{IntoResponse, Response};
use hyper::client::HttpConnector;
use hyper::server::conn::AddrIncoming;
use hyper::server::Builder;
use hyper::{Body, Client};
use serde::{Deserialize, Serialize};
use std::net::SocketAddr;
use std::sync::{Arc, Mutex};
use tokio::runtime::Runtime;
use tokio::sync::oneshot::Sender;
use tokio::task::JoinHandle;

/// The struct defines the http server that can be used to interface the remote actions.
/// By default, the server is deployed to the localhost.
/// The port is selected dynamically if it is not specified.
///
/// #Notes
/// The main purpose of the server is to provide the api for blackboard and tracer.
/// The server is started automatically if there is at least one remote action registered.
/// When forester is finished it is automatically stops the server as well.
#[derive(Clone)]
pub struct HttpServ {
    bb: Arc<Mutex<BlackBoard>>,
    tracer: Arc<Mutex<Tracer>>,
    client: Client<HttpConnector, Body>,
}

/// The struct defines the information of the server.
/// It is used to stop the server and get the status of the server.
pub struct ServInfo {
    pub status: JoinHandle<RtOk>,
    pub serv_port: u16,
    pub stop_cmd: StopCmd,
}

impl ServInfo {
    pub fn stop(mut self) -> Result<(), ()> {
        self.stop_cmd.send(())
    }
}

pub type StopCmd = Sender<()>;

impl HttpServ {
    fn new(
        bb: Arc<Mutex<BlackBoard>>,
        tracer: Arc<Mutex<Tracer>>,
        client: Client<HttpConnector, Body>,
    ) -> Self {
        Self { bb, tracer, client }
    }
}

/// starts the server for access from remote actions
///
/// # Parameters
/// - `rt` - the runtime for the server. Typically can be obtained from Forester instance
/// - `port` - the port for the server. If it is not specified, the port is selected dynamically.
/// - `bb` - the blackboard that is used to store the data
/// - `tracer` - the tracer that is used to store the trace events
///
/// # Returns
/// the information of the server
pub fn start(
    rt: &Runtime,
    port: ServerPort,
    bb: Arc<Mutex<BlackBoard>>,
    tracer: Arc<Mutex<Tracer>>,
) -> RtResult<ServInfo> {
    let (tx, rx) = tokio::sync::oneshot::channel::<()>();
    let loc_port = if let ServerPort::Static(p) = port.clone() {
        p
    } else {
        0
    };
    let handle: JoinHandle<RtOk> = rt.spawn(async {
        match bind(port) {
            Ok(builder) => {
                let client:Client<HttpConnector,Body> = hyper::Client::builder().build(HttpConnector::new());
                let service = routing(HttpServ::new(bb, tracer, client))
                    .into_make_service();
                let server = builder.serve(service);

                debug!(target:"http_server", " the server is deployed to {} ", &server.local_addr().port());
                let serv_with_shutdown = server.with_graceful_shutdown(async {
                    rx.await.ok();
                });
                if let Err(e) = serv_with_shutdown.await {
                    debug!(target:"http_server", "server error: {}", e);
                    Err(RuntimeError::IOError(format!("{}", e.to_string())))
                } else {
                    Ok(())
                }
            }
            Err(e) => {
                debug!(target:"http_server", "server error: {:?}", e);
                Err(RuntimeError::IOError(format!("{:?}", e)))
            }
        }
    });

    Ok(ServInfo {
        status: handle,
        serv_port: loc_port,
        stop_cmd: tx,
    })
}
fn bind(port: ServerPort) -> Result<Builder<AddrIncoming>, RuntimeError> {
    match port {
        ServerPort::None => Err(RuntimeError::Unexpected(
            "the port for http server is not selected.".to_string(),
        )),
        ServerPort::Static(port) => {
            axum::Server::try_bind(&SocketAddr::from(([127, 0, 0, 1], port)))
                .map_err(|e| RuntimeError::IOError(e.to_string()))
        }
    }
}

fn routing(delegate: HttpServ) -> Router {
    Router::new()
        .route("/", get(|| async { "OK" }))
        .route("/tracer/custom", post(trace))
        .route("/tracer/print", get(print_trace))
        .route("/bb/:key/lock", get(bb_lock))
        .route("/bb/:key/unlock", get(bb_unlock))
        .route("/bb/:key/locked", get(bb_is_locked))
        .route("/bb/:key/contains", get(bb_contains))
        .route("/bb/:key/take", get(bb_take))
        .route("/bb/:key", post(bb_put))
        .route("/bb/:key", get(bb_get))
        .with_state(delegate)
}

fn err_handler<R>(r: RtResult<R>) -> Response
where
    R: IntoResponse,
{
    match r {
        Ok(r) => r.into_response(),
        Err(e) => {
            let err_str = format!("{:?}", e);
            debug!(target: "http_server", "internal error: {}",err_str);
            (StatusCode::INTERNAL_SERVER_ERROR, err_str).into_response()
        }
    }
}

#[derive(Debug, Deserialize, Serialize)]
pub(crate) struct CustomEvent {
    text: String,
    tick: usize,
}