1extern crate tetsy_jsonrpc_tcp_server;
20extern crate tetsy_jsonrpc_core;
21extern crate vapory_types;
22extern crate tetsy_keccak_hash as hash;
23extern crate parking_lot;
24
25#[macro_use] extern crate log;
26
27#[cfg(test)] extern crate tokio;
28#[cfg(test)] extern crate tokio_io;
29#[cfg(test)] extern crate env_logger;
30
31mod traits;
32
33pub use traits::{
34 JobDispatcher, PushWorkHandler, Error, ServiceConfiguration,
35};
36
37use tetsy_jsonrpc_tcp_server::{
38 Server as JsonRpcServer, ServerBuilder as JsonRpcServerBuilder,
39 RequestContext, MetaExtractor, Dispatcher, PushMessageError,
40};
41use tetsy_jsonrpc_core::{MetaIoHandler, Params, to_value, Value, Metadata, Compatibility, IoDelegate};
42use std::sync::Arc;
43
44use std::net::SocketAddr;
45use std::collections::{HashSet, HashMap};
46use hash::keccak;
47use vapory_types::H256;
48use parking_lot::RwLock;
49
50type RpcResult = Result<tetsy_jsonrpc_core::Value, tetsy_jsonrpc_core::Error>;
51
52const NOTIFY_COUNTER_INITIAL: u32 = 16;
53
54pub struct Stratum {
56 rpc_server: Option<JsonRpcServer>,
60 implementation: Arc<StratumImpl>,
64 tcp_dispatcher: Dispatcher,
68}
69
70impl Stratum {
71 pub fn start(
72 addr: &SocketAddr,
73 dispatcher: Arc<dyn JobDispatcher>,
74 secret: Option<H256>,
75 ) -> Result<Arc<Stratum>, Error> {
76
77 let implementation = Arc::new(StratumImpl {
78 subscribers: RwLock::default(),
79 job_queue: RwLock::default(),
80 dispatcher,
81 workers: Arc::new(RwLock::default()),
82 secret,
83 notify_counter: RwLock::new(NOTIFY_COUNTER_INITIAL),
84 });
85
86 let mut delegate = IoDelegate::<StratumImpl, SocketMetadata>::new(implementation.clone());
87 delegate.add_method_with_meta("mining.subscribe", StratumImpl::subscribe);
88 delegate.add_method_with_meta("mining.authorize", StratumImpl::authorize);
89 delegate.add_method_with_meta("mining.submit", StratumImpl::submit);
90 let mut handler = MetaIoHandler::<SocketMetadata>::with_compatibility(Compatibility::Both);
91 handler.extend_with(delegate);
92
93 let server_builder = JsonRpcServerBuilder::new(handler);
94 let tcp_dispatcher = server_builder.dispatcher();
95 let server_builder = server_builder.session_meta_extractor(PeerMetaExtractor::new(tcp_dispatcher.clone()));
96 let server = server_builder.start(addr)?;
97
98 let stratum = Arc::new(Stratum {
99 rpc_server: Some(server),
100 implementation,
101 tcp_dispatcher,
102 });
103
104 Ok(stratum)
105 }
106}
107
108impl PushWorkHandler for Stratum {
109 fn push_work_all(&self, payload: String) {
110 self.implementation.push_work_all(payload, &self.tcp_dispatcher)
111 }
112}
113
114impl Drop for Stratum {
115 fn drop(&mut self) {
116 self.rpc_server.take().map(|server| server.close());
118 }
119}
120
121struct StratumImpl {
122 subscribers: RwLock<Vec<SocketAddr>>,
124 job_queue: RwLock<HashSet<SocketAddr>>,
126 dispatcher: Arc<dyn JobDispatcher>,
128 workers: Arc<RwLock<HashMap<SocketAddr, String>>>,
130 secret: Option<H256>,
132 notify_counter: RwLock<u32>,
134}
135
136impl StratumImpl {
137 fn subscribe(&self, _params: Params, meta: SocketMetadata) -> RpcResult {
139 use std::str::FromStr;
140
141 self.subscribers.write().push(meta.addr().clone());
142 self.job_queue.write().insert(meta.addr().clone());
143 trace!(target: "stratum", "Subscription request from {:?}", meta.addr());
144
145 Ok(match self.dispatcher.initial() {
146 Some(initial) => match tetsy_jsonrpc_core::Value::from_str(&initial) {
147 Ok(val) => Ok(val),
148 Err(e) => {
149 warn!(target: "stratum", "Invalid payload: '{}' ({:?})", &initial, e);
150 to_value(&[0u8; 0])
151 },
152 },
153 None => to_value(&[0u8; 0]),
154 }.expect("Empty slices are serializable; qed"))
155 }
156
157 fn authorize(&self, params: Params, meta: SocketMetadata) -> RpcResult {
159 params.parse::<(String, String)>().map(|(worker_id, secret)| {
160 if let Some(valid_secret) = self.secret {
161 let hash = keccak(secret);
162 if hash != valid_secret {
163 return to_value(&false);
164 }
165 }
166 trace!(target: "stratum", "New worker #{} registered", worker_id);
167 self.workers.write().insert(meta.addr().clone(), worker_id);
168 to_value(true)
169 }).map(|v| v.expect("Only true/false is returned and it's always serializable; qed"))
170 }
171
172 fn submit(&self, params: Params, meta: SocketMetadata) -> RpcResult {
174 Ok(match params {
175 Params::Array(vals) => {
176 match self.dispatcher.submit(vals.iter().skip(2)
178 .filter_map(|val| match *val {
179 Value::String(ref s) => Some(s.to_owned()),
180 _ => None
181 })
182 .collect::<Vec<String>>()) {
183 Ok(()) => {
184 self.update_peers(&meta.tcp_dispatcher.expect("tcp_dispatcher is always initialized; qed"));
185 to_value(true)
186 },
187 Err(submit_err) => {
188 warn!("Error while submitting share: {:?}", submit_err);
189 to_value(false)
190 }
191 }
192 },
193 _ => {
194 trace!(target: "stratum", "Invalid submit work format {:?}", params);
195 to_value(false)
196 }
197 }.expect("Only true/false is returned and it's always serializable; qed"))
198 }
199
200 fn update_peers(&self, tcp_dispatcher: &Dispatcher) {
202 if let Some(job) = self.dispatcher.job() {
203 self.push_work_all(job, tcp_dispatcher)
204 }
205 }
206
207 fn push_work_all(&self, payload: String, tcp_dispatcher: &Dispatcher) {
208 let hup_peers = {
209 let workers = self.workers.read();
210 let next_request_id = {
211 let mut counter = self.notify_counter.write();
212 if *counter == ::std::u32::MAX {
213 *counter = NOTIFY_COUNTER_INITIAL;
214 } else {
215 *counter = *counter + 1
216 }
217 *counter
218 };
219
220 let mut hup_peers = HashSet::new();
221 let workers_msg = format!("{{ \"id\": {}, \"method\": \"mining.notify\", \"params\": {} }}", next_request_id, payload);
222 trace!(target: "stratum", "pushing work for {} workers (payload: '{}')", workers.len(), &workers_msg);
223 for (addr, _) in workers.iter() {
224 trace!(target: "stratum", "pushing work to {}", addr);
225 match tcp_dispatcher.push_message(addr, workers_msg.clone()) {
226 Err(PushMessageError::NoSuchPeer) => {
227 trace!(target: "stratum", "Worker no longer connected: {}", addr);
228 hup_peers.insert(addr.clone());
229 },
230 Err(e) => {
231 warn!(target: "stratum", "Unexpected transport error: {:?}", e);
232 },
233 Ok(_) => {},
234 }
235 }
236 hup_peers
237 };
238
239 if !hup_peers.is_empty() {
240 let mut workers = self.workers.write();
241 for hup_peer in hup_peers {
242 workers.remove(&hup_peer);
243 }
244 }
245 }
246}
247
248#[derive(Clone)]
249pub struct SocketMetadata {
250 addr: SocketAddr,
251 tcp_dispatcher: Option<Dispatcher>,
255}
256
257impl Default for SocketMetadata {
258 fn default() -> Self {
259 SocketMetadata {
260 addr: "0.0.0.0:0".parse().unwrap(),
261 tcp_dispatcher: None,
262 }
263 }
264}
265
266impl SocketMetadata {
267 pub fn addr(&self) -> &SocketAddr {
268 &self.addr
269 }
270}
271
272impl Metadata for SocketMetadata { }
273
274pub struct PeerMetaExtractor {
275 tcp_dispatcher: Dispatcher,
276}
277
278impl PeerMetaExtractor {
279 fn new(tcp_dispatcher: Dispatcher) -> Self {
280 PeerMetaExtractor {
281 tcp_dispatcher,
282 }
283 }
284}
285
286impl MetaExtractor<SocketMetadata> for PeerMetaExtractor {
287 fn extract(&self, context: &RequestContext) -> SocketMetadata {
288 SocketMetadata {
289 addr: context.peer_addr,
290 tcp_dispatcher: Some(self.tcp_dispatcher.clone()),
291 }
292 }
293}
294
295#[cfg(test)]
296mod tests {
297 use super::*;
298 use std::net::{SocketAddr, Shutdown};
299 use std::sync::Arc;
300
301 use tokio::{io, runtime::Runtime, timer::timeout::{self, Timeout}, net::TcpStream};
302 use tetsy_jsonrpc_core::futures::{Future, future};
303
304 pub struct VoidManager;
305
306 impl JobDispatcher for VoidManager {
307 fn submit(&self, _payload: Vec<String>) -> Result<(), Error> {
308 Ok(())
309 }
310 }
311
312 fn dummy_request(addr: &SocketAddr, data: &str) -> Vec<u8> {
313 let mut runtime = Runtime::new().expect("Tokio Runtime should be created with no errors");
314
315 let mut data_vec = data.as_bytes().to_vec();
316 data_vec.extend(b"\n");
317
318 let stream = TcpStream::connect(addr)
319 .and_then(move |stream| {
320 io::write_all(stream, data_vec)
321 })
322 .and_then(|(stream, _)| {
323 stream.shutdown(Shutdown::Write).unwrap();
324 io::read_to_end(stream, Vec::with_capacity(2048))
325 })
326 .and_then(|(_stream, read_buf)| {
327 future::ok(read_buf)
328 });
329 let result = runtime.block_on(stream).expect("Runtime should run with no errors");
330
331 result
332 }
333
334 #[test]
335 fn can_be_started() {
336 let stratum = Stratum::start(&"127.0.0.1:19980".parse().unwrap(), Arc::new(VoidManager), None);
337 assert!(stratum.is_ok());
338 }
339
340 #[test]
341 fn records_subscriber() {
342 let _ = ::env_logger::try_init();
343
344 let addr = "127.0.0.1:19985".parse().unwrap();
345 let stratum = Stratum::start(&addr, Arc::new(VoidManager), None).unwrap();
346 let request = r#"{"jsonrpc": "2.0", "method": "mining.subscribe", "params": [], "id": 1}"#;
347 dummy_request(&addr, request);
348 assert_eq!(1, stratum.implementation.subscribers.read().len());
349 }
350
351 struct DummyManager {
352 initial_payload: String
353 }
354
355 impl DummyManager {
356 fn new() -> Arc<DummyManager> {
357 Arc::new(Self::build())
358 }
359
360 fn build() -> DummyManager {
361 DummyManager { initial_payload: r#"[ "dummy payload" ]"#.to_owned() }
362 }
363
364 fn of_initial(mut self, new_initial: &str) -> DummyManager {
365 self.initial_payload = new_initial.to_owned();
366 self
367 }
368 }
369
370 impl JobDispatcher for DummyManager {
371 fn initial(&self) -> Option<String> {
372 Some(self.initial_payload.clone())
373 }
374
375 fn submit(&self, _payload: Vec<String>) -> Result<(), Error> {
376 Ok(())
377 }
378 }
379
380 fn terminated_str(origin: &'static str) -> String {
381 let mut s = String::new();
382 s.push_str(origin);
383 s.push_str("\n");
384 s
385 }
386
387 #[test]
388 fn receives_initial_payload() {
389 let addr = "127.0.0.1:19975".parse().unwrap();
390 let _stratum = Stratum::start(&addr, DummyManager::new(), None).expect("There should be no error starting stratum");
391 let request = r#"{"jsonrpc": "2.0", "method": "mining.subscribe", "params": [], "id": 2}"#;
392
393 let response = String::from_utf8(dummy_request(&addr, request)).unwrap();
394
395 assert_eq!(terminated_str(r#"{"jsonrpc":"2.0","result":["dummy payload"],"id":2}"#), response);
396 }
397
398 #[test]
399 fn can_authorize() {
400 let addr = "127.0.0.1:19970".parse().unwrap();
401 let stratum = Stratum::start(
402 &addr,
403 Arc::new(DummyManager::build().of_initial(r#"["dummy autorize payload"]"#)),
404 None
405 ).expect("There should be no error starting stratum");
406
407 let request = r#"{"jsonrpc": "2.0", "method": "mining.authorize", "params": ["miner1", ""], "id": 1}"#;
408 let response = String::from_utf8(dummy_request(&addr, request)).unwrap();
409
410 assert_eq!(terminated_str(r#"{"jsonrpc":"2.0","result":true,"id":1}"#), response);
411 assert_eq!(1, stratum.implementation.workers.read().len());
412 }
413
414 #[test]
415 fn can_push_work() {
416 let _ = ::env_logger::try_init();
417
418 let addr = "127.0.0.1:19995".parse().unwrap();
419 let stratum = Stratum::start(
420 &addr,
421 Arc::new(DummyManager::build().of_initial(r#"["dummy autorize payload"]"#)),
422 None
423 ).expect("There should be no error starting stratum");
424
425 let mut auth_request =
426 r#"{"jsonrpc": "2.0", "method": "mining.authorize", "params": ["miner1", ""], "id": 1}"#
427 .as_bytes()
428 .to_vec();
429 auth_request.extend(b"\n");
430
431 let auth_response = "{\"jsonrpc\":\"2.0\",\"result\":true,\"id\":1}\n";
432
433 let mut runtime = Runtime::new().expect("Tokio Runtime should be created with no errors");
434 let read_buf0 = vec![0u8; auth_response.len()];
435 let read_buf1 = Vec::with_capacity(2048);
436 let stream = TcpStream::connect(&addr)
437 .and_then(move |stream| {
438 io::write_all(stream, auth_request)
439 })
440 .and_then(|(stream, _)| {
441 io::read_exact(stream, read_buf0)
442 })
443 .map_err(|err| panic!("{:?}", err))
444 .and_then(move |(stream, read_buf0)| {
445 assert_eq!(String::from_utf8(read_buf0).unwrap(), auth_response);
446 trace!(target: "stratum", "Received authorization confirmation");
447 Timeout::new(future::ok(stream), ::std::time::Duration::from_millis(100))
448 })
449 .map_err(|err: timeout::Error<()>| panic!("Timeout: {:?}", err))
450 .and_then(move |stream| {
451 trace!(target: "stratum", "Pusing work to peers");
452 stratum.push_work_all(r#"{ "00040008", "100500" }"#.to_owned());
453 Timeout::new(future::ok(stream), ::std::time::Duration::from_millis(100))
454 })
455 .map_err(|err: timeout::Error<()>| panic!("Timeout: {:?}", err))
456 .and_then(|stream| {
457 trace!(target: "stratum", "Ready to read work from server");
458 stream.shutdown(Shutdown::Write).unwrap();
459 io::read_to_end(stream, read_buf1)
460 })
461 .and_then(|(_, read_buf1)| {
462 trace!(target: "stratum", "Received work from server");
463 future::ok(read_buf1)
464 });
465 let response = String::from_utf8(
466 runtime.block_on(stream).expect("Runtime should run with no errors")
467 ).expect("Response should be utf-8");
468
469 assert_eq!(
470 "{ \"id\": 17, \"method\": \"mining.notify\", \"params\": { \"00040008\", \"100500\" } }\n",
471 response);
472 }
473
474 #[test]
475 fn jsonprc_server_is_send_and_sync() {
476 fn is_send_and_sync<T: Send + Sync>() {}
477
478 is_send_and_sync::<JsonRpcServer>();
479 }
480}