1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
use std::sync::Arc;
use serde::{de::DeserializeOwned, Serialize};
use smol::channel::Sender;
use smol::prelude::*;
use crate::MelnetError;
pub trait Endpoint<Req: DeserializeOwned, Resp: Serialize>: Send + Sync {
fn respond(&self, req: Request<Req, Resp>);
}
impl<Req: DeserializeOwned, Resp: Serialize, F: Fn(Request<Req, Resp>) + 'static + Send + Sync>
Endpoint<Req, Resp> for F
{
fn respond(&self, req: Request<Req, Resp>) {
(self)(req)
}
}
pub(crate) fn responder_to_closure<
Req: DeserializeOwned + Send,
Resp: Serialize + Send + 'static,
>(
state: crate::NetState,
responder: impl Endpoint<Req, Resp> + 'static + Send,
) -> BoxedResponder {
let clos = move |bts: &[u8]| {
let decoded: Result<Req, _> = stdcode::deserialize(bts);
match decoded {
Ok(decoded) => {
let (respond, recv_respond) = smol::channel::bounded(1);
let request = Request {
state: state.clone(),
body: decoded,
response: ResponseChan { respond },
};
responder.respond(request);
let response_fut = async move {
recv_respond
.recv()
.await
.unwrap_or(Err(MelnetError::InternalServerError))
.map(|v| stdcode::serialize(&v).unwrap())
};
response_fut.boxed()
}
Err(e) => {
log::warn!("issue decoding request: {}", e);
async { Err(MelnetError::InternalServerError) }.boxed()
}
}
};
BoxedResponder(Arc::new(clos))
}
#[allow(clippy::type_complexity)]
#[derive(Clone)]
pub(crate) struct BoxedResponder(
pub Arc<dyn Fn(&[u8]) -> smol::future::Boxed<crate::Result<Vec<u8>>> + Send + Sync>,
);
#[must_use]
pub struct Request<Req: DeserializeOwned, Resp: Serialize> {
pub body: Req,
pub state: crate::NetState,
pub response: ResponseChan<Resp>,
}
pub struct ResponseChan<Resp: Serialize> {
respond: Sender<crate::Result<Resp>>,
}
impl<Resp: Serialize> ResponseChan<Resp> {
pub fn send(self, resp: crate::Result<Resp>) {
let _ = self.respond.try_send(resp);
}
}