jsonrpsee_core/server/
helpers.rs1use std::io;
28use std::time::Duration;
29
30use jsonrpsee_types::{ErrorCode, ErrorObject, Id, InvalidRequest, Response, ResponsePayload};
31use tokio::sync::mpsc;
32
33use super::{DisconnectError, SendTimeoutError, SubscriptionMessage, TrySendError};
34
35#[derive(Debug, Clone)]
47pub struct BoundedWriter {
48 max_len: usize,
49 buf: Vec<u8>,
50}
51
52impl BoundedWriter {
53 pub fn new(max_len: usize) -> Self {
55 Self { max_len, buf: Vec::with_capacity(128) }
56 }
57
58 pub fn into_bytes(self) -> Vec<u8> {
60 self.buf
61 }
62}
63
64impl<'a> io::Write for &'a mut BoundedWriter {
65 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
66 let len = self.buf.len() + buf.len();
67 if self.max_len >= len {
68 self.buf.extend_from_slice(buf);
69 Ok(buf.len())
70 } else {
71 Err(io::Error::new(io::ErrorKind::OutOfMemory, "Memory capacity exceeded"))
72 }
73 }
74
75 fn flush(&mut self) -> io::Result<()> {
76 Ok(())
77 }
78}
79
80#[derive(Clone, Debug)]
82pub struct MethodSink {
83 tx: mpsc::Sender<String>,
85 max_response_size: u32,
87}
88
89impl MethodSink {
90 pub fn new(tx: mpsc::Sender<String>) -> Self {
92 MethodSink { tx, max_response_size: u32::MAX }
93 }
94
95 pub fn new_with_limit(tx: mpsc::Sender<String>, max_response_size: u32) -> Self {
97 MethodSink { tx, max_response_size }
98 }
99
100 pub fn is_closed(&self) -> bool {
102 self.tx.is_closed()
103 }
104
105 pub async fn closed(&self) {
111 self.tx.closed().await
112 }
113
114 pub const fn max_response_size(&self) -> u32 {
116 self.max_response_size
117 }
118
119 pub fn try_send(&mut self, msg: String) -> Result<(), TrySendError> {
124 self.tx.try_send(msg).map_err(Into::into)
125 }
126
127 pub async fn send(&self, msg: String) -> Result<(), DisconnectError> {
129 self.tx.send(msg).await.map_err(Into::into)
130 }
131
132 pub async fn send_error<'a>(&self, id: Id<'a>, err: ErrorObject<'a>) -> Result<(), DisconnectError> {
134 let payload = ResponsePayload::<()>::error_borrowed(err);
135 let json = serde_json::to_string(&Response::new(payload, id)).expect("valid JSON; qed");
136
137 self.send(json).await
138 }
139
140 pub async fn send_timeout(&self, msg: String, timeout: Duration) -> Result<(), SendTimeoutError> {
142 self.tx.send_timeout(msg, timeout).await.map_err(Into::into)
143 }
144
145 pub fn capacity(&self) -> usize {
147 self.tx.capacity()
148 }
149
150 pub fn max_capacity(&self) -> usize {
152 self.tx.max_capacity()
153 }
154
155 pub async fn has_capacity(&self) -> Result<(), DisconnectError> {
157 match self.tx.reserve().await {
158 Ok(_) => Ok(()),
161 Err(_) => Err(DisconnectError(SubscriptionMessage::empty())),
162 }
163 }
164}
165
166pub fn prepare_error(data: &[u8]) -> (Id<'_>, ErrorCode) {
169 match serde_json::from_slice::<InvalidRequest>(data) {
170 Ok(InvalidRequest { id }) => (id, ErrorCode::InvalidRequest),
171 Err(_) => (Id::Null, ErrorCode::ParseError),
172 }
173}
174
175#[cfg(test)]
176mod tests {
177 use crate::server::BoundedWriter;
178 use jsonrpsee_types::{Id, Response, ResponsePayload};
179
180 #[test]
181 fn bounded_serializer_work() {
182 let mut writer = BoundedWriter::new(100);
183 let result = ResponsePayload::success(&"success");
184 let rp = &Response::new(result, Id::Number(1));
185
186 assert!(serde_json::to_writer(&mut writer, rp).is_ok());
187 assert_eq!(String::from_utf8(writer.into_bytes()).unwrap(), r#"{"jsonrpc":"2.0","id":1,"result":"success"}"#);
188 }
189
190 #[test]
191 fn bounded_serializer_cap_works() {
192 let mut writer = BoundedWriter::new(100);
193 assert!(serde_json::to_writer(&mut writer, &"x".repeat(99)).is_err());
195 }
196}