1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
mod credentials;
mod subscription;
mod transport;
pub use credentials::Credentials;
pub use subscription::{Subscription, SubscriptionError};
pub use transport::http::{Http, HttpError};
#[cfg(target_family = "unix")]
pub use transport::uds::{Uds, UdsError};
pub use transport::websocket::{WebSocket, WebSocketError};
use crate::rpc::{sub::SubscriptionRequest, Rpc, RpcResponse};
use log::{debug, info, trace};
use serde::de::DeserializeOwned;
use thiserror::Error;
pub trait Request {
fn request(&mut self, cmd: String) -> Result<String, ConnectionError>;
}
pub trait Subscribe {
fn read_next(&mut self) -> Result<String, ConnectionError>;
fn fork(&self) -> Result<Self, ConnectionError>
where
Self: Sized;
}
pub struct Connection<T: Request> {
transport: T,
id_pool: std::collections::VecDeque<usize>,
}
impl<T> Connection<T>
where
T: Request,
{
pub fn new(transport: T) -> Self {
Self {
transport,
id_pool: (0..1000).collect(),
}
}
pub fn call<U>(&mut self, mut rpc: Rpc<U>) -> Result<U, ConnectionError>
where
U: DeserializeOwned + std::fmt::Debug,
{
if let Some(id) = self.id_pool.pop_front() {
trace!("Using id {} for request", id);
rpc.id = id;
debug!("Calling rpc method: {:?}", &rpc);
self.id_pool.push_back(id);
let result_data = self.transport.request(serde_json::to_string(&rpc)?)?;
let result = serde_json::from_str::<RpcResponse<U>>(&result_data)?;
Ok(result.result)
} else {
Err(ConnectionError::NoTicketId)
}
}
}
impl<T> Connection<T>
where
T: Request + Subscribe,
{
pub fn subscribe<U: DeserializeOwned + std::fmt::Debug>(
&mut self,
sub_request: SubscriptionRequest<U>,
) -> Result<Subscription<T, U>, ConnectionError> {
info!("Starting a new subscription");
let mut connection = Connection {
transport: self.transport.fork()?,
id_pool: self.id_pool.clone(),
};
let subscription_id = connection.call(sub_request.rpc)?;
Ok(Subscription {
id: subscription_id,
connection,
result_type: std::marker::PhantomData,
})
}
}
#[derive(Debug, Error)]
#[error("{message}")]
pub struct JsonError {
code: i32,
message: String,
}
#[allow(clippy::large_enum_variant)]
#[derive(Debug, Error)]
pub enum ConnectionError {
#[error("{0}")]
WebSocketError(#[from] WebSocketError),
#[error("{0}")]
HttpError(#[from] HttpError),
#[cfg(target_family = "unix")]
#[error("{0}")]
UdsError(#[from] UdsError),
#[error("Node Response Error: {0:?}")]
JsonRpc(#[from] JsonError),
#[error("Connector De-/Serialization Error: {0}")]
Serde(#[from] serde_json::Error),
#[error("Connector Error: Maximum number of connections reached")]
NoTicketId,
}