forester_rs/runtime/forester/
serv.rs1mod routes;
2
3use crate::runtime::blackboard::BlackBoard;
4use crate::runtime::builder::ServerPort;
5use crate::tracer::{Tracer};
6use axum::routing::{get, post};
7use axum::{Router};
8
9use crate::runtime::forester::serv::routes::*;
10use crate::runtime::{RtOk, RtResult, RuntimeError};
11use axum::http::StatusCode;
12use axum::response::{IntoResponse, Response};
13use hyper::client::HttpConnector;
14use hyper::server::conn::AddrIncoming;
15use hyper::server::Builder;
16use hyper::{Body, Client};
17use serde::{Deserialize, Serialize};
18use std::net::SocketAddr;
19use std::sync::{Arc, Mutex};
20
21use tokio::sync::oneshot::Sender;
22use tokio::task::JoinHandle;
23use crate::runtime::env::RtEnv;
24
25#[derive(Clone)]
34pub struct HttpServ {
35 bb: Arc<Mutex<BlackBoard>>,
36 tracer: Arc<Mutex<Tracer>>,
37 client: Client<HttpConnector, Body>,
38}
39
40pub struct ServInfo {
43 pub status: JoinHandle<RtOk>,
44 pub serv_port: u16,
45 pub stop_cmd: StopCmd,
46}
47
48impl ServInfo {
49 pub fn stop(self) -> Result<(), ()> {
50 self.stop_cmd.send(())
51 }
52}
53
54pub type StopCmd = Sender<()>;
55
56impl HttpServ {
57 fn new(
58 bb: Arc<Mutex<BlackBoard>>,
59 tracer: Arc<Mutex<Tracer>>,
60 client: Client<HttpConnector, Body>,
61 ) -> Self {
62 Self { bb, tracer, client }
63 }
64}
65
66pub fn start(
77 rt: Arc<Mutex<RtEnv>>,
78 port: ServerPort,
79 bb: Arc<Mutex<BlackBoard>>,
80 tracer: Arc<Mutex<Tracer>>,
81) -> RtResult<ServInfo> {
82 let (tx, rx) = tokio::sync::oneshot::channel::<()>();
83 let loc_port = if let ServerPort::Static(p) = port.clone() {
84 p
85 } else {
86 0
87 };
88 let rt = rt.lock()?;
89 let handle: JoinHandle<RtOk> = rt.runtime.spawn(async {
90 match bind(port) {
91 Ok(builder) => {
92 let client:Client<HttpConnector,Body> = hyper::Client::builder().build(HttpConnector::new());
93 let service = routing(HttpServ::new(bb, tracer, client))
94 .into_make_service();
95 let server = builder.serve(service);
96
97 debug!(target:"http_server", " the server is deployed to {} ", &server.local_addr().port());
98 let serv_with_shutdown = server.with_graceful_shutdown(async {
99 rx.await.ok();
100 });
101 if let Err(e) = serv_with_shutdown.await {
102 debug!(target:"http_server", "server error: {}", e);
103 Err(RuntimeError::IOError(format!("{}", e.to_string())))
104 } else {
105 Ok(())
106 }
107 }
108 Err(e) => {
109 debug!(target:"http_server", "server error: {:?}", e);
110 Err(RuntimeError::IOError(format!("{:?}", e)))
111 }
112 }
113 });
114
115 Ok(ServInfo {
116 status: handle,
117 serv_port: loc_port,
118 stop_cmd: tx,
119 })
120}
121fn bind(port: ServerPort) -> Result<Builder<AddrIncoming>, RuntimeError> {
122 match port {
123 ServerPort::None => Err(RuntimeError::Unexpected(
124 "the port for http server is not selected.".to_string(),
125 )),
126 ServerPort::Static(port) => {
127 axum::Server::try_bind(&SocketAddr::from(([127, 0, 0, 1], port)))
128 .map_err(|e| RuntimeError::IOError(e.to_string()))
129 }
130 }
131}
132
133fn routing(delegate: HttpServ) -> Router {
134 Router::new()
135 .route("/", get(|| async { "OK" }))
136 .route("/tracer/custom", post(trace))
137 .route("/tracer/print", get(print_trace))
138 .route("/bb/:key/lock", get(bb_lock))
139 .route("/bb/:key/unlock", get(bb_unlock))
140 .route("/bb/:key/locked", get(bb_is_locked))
141 .route("/bb/:key/contains", get(bb_contains))
142 .route("/bb/:key/take", get(bb_take))
143 .route("/bb/:key", post(bb_put))
144 .route("/bb/:key", get(bb_get))
145 .with_state(delegate)
146}
147
148fn err_handler<R>(r: RtResult<R>) -> Response
149where
150 R: IntoResponse,
151{
152 match r {
153 Ok(r) => r.into_response(),
154 Err(e) => {
155 let err_str = format!("{:?}", e);
156 debug!(target: "http_server", "internal error: {}",err_str);
157 (StatusCode::INTERNAL_SERVER_ERROR, err_str).into_response()
158 }
159 }
160}
161
162#[derive(Debug, Deserialize, Serialize)]
163pub(crate) struct CustomEvent {
164 text: String,
165 tick: usize,
166}