vapcore_stratum/
lib.rs

1// Copyright 2015-2020 Parity Technologies (UK) Ltd.
2// This file is part of Tetsy Vapory.
3
4// Tetsy Vapory is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// Tetsy Vapory is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with Tetsy Vapory.  If not, see <http://www.gnu.org/licenses/>.
16
17//! Stratum protocol implementation for parity vapory/bitcoin clients
18
19extern crate tetsy_jsonrpc_tcp_server;
20extern crate tetsy_jsonrpc_core;
21extern crate vapory_types;
22extern crate tetsy_keccak_hash as hash;
23extern crate parking_lot;
24
25#[macro_use] extern crate log;
26
27#[cfg(test)] extern crate tokio;
28#[cfg(test)] extern crate tokio_io;
29#[cfg(test)] extern crate env_logger;
30
31mod traits;
32
33pub use traits::{
34	JobDispatcher, PushWorkHandler, Error, ServiceConfiguration,
35};
36
37use tetsy_jsonrpc_tcp_server::{
38	Server as JsonRpcServer, ServerBuilder as JsonRpcServerBuilder,
39	RequestContext, MetaExtractor, Dispatcher, PushMessageError,
40};
41use tetsy_jsonrpc_core::{MetaIoHandler, Params, to_value, Value, Metadata, Compatibility, IoDelegate};
42use std::sync::Arc;
43
44use std::net::SocketAddr;
45use std::collections::{HashSet, HashMap};
46use hash::keccak;
47use vapory_types::H256;
48use parking_lot::RwLock;
49
50type RpcResult = Result<tetsy_jsonrpc_core::Value, tetsy_jsonrpc_core::Error>;
51
52const NOTIFY_COUNTER_INITIAL: u32 = 16;
53
54/// Container which owns rpc server and stratum implementation
55pub struct Stratum {
56	/// RPC server
57	///
58	/// It is an `Option` so it can be easily closed and released during `drop` phase
59	rpc_server: Option<JsonRpcServer>,
60	/// stratum protocol implementation
61	///
62	/// It is owned by a container and rpc server
63	implementation: Arc<StratumImpl>,
64	/// Message dispatcher (tcp/ip service)
65	///
66	/// Used to push messages to peers
67	tcp_dispatcher: Dispatcher,
68}
69
70impl Stratum {
71	pub fn start(
72		addr: &SocketAddr,
73		dispatcher: Arc<dyn JobDispatcher>,
74		secret: Option<H256>,
75	) -> Result<Arc<Stratum>, Error> {
76
77		let implementation = Arc::new(StratumImpl {
78			subscribers: RwLock::default(),
79			job_queue: RwLock::default(),
80			dispatcher,
81			workers: Arc::new(RwLock::default()),
82			secret,
83			notify_counter: RwLock::new(NOTIFY_COUNTER_INITIAL),
84		});
85
86		let mut delegate = IoDelegate::<StratumImpl, SocketMetadata>::new(implementation.clone());
87		delegate.add_method_with_meta("mining.subscribe", StratumImpl::subscribe);
88		delegate.add_method_with_meta("mining.authorize", StratumImpl::authorize);
89		delegate.add_method_with_meta("mining.submit", StratumImpl::submit);
90		let mut handler = MetaIoHandler::<SocketMetadata>::with_compatibility(Compatibility::Both);
91		handler.extend_with(delegate);
92
93		let server_builder = JsonRpcServerBuilder::new(handler);
94		let tcp_dispatcher = server_builder.dispatcher();
95		let server_builder = server_builder.session_meta_extractor(PeerMetaExtractor::new(tcp_dispatcher.clone()));
96		let server = server_builder.start(addr)?;
97
98		let stratum = Arc::new(Stratum {
99			rpc_server: Some(server),
100			implementation,
101			tcp_dispatcher,
102		});
103
104		Ok(stratum)
105	}
106}
107
108impl PushWorkHandler for Stratum {
109	fn push_work_all(&self, payload: String) {
110		self.implementation.push_work_all(payload, &self.tcp_dispatcher)
111	}
112}
113
114impl Drop for Stratum {
115	fn drop(&mut self) {
116		// shut down rpc server
117		self.rpc_server.take().map(|server| server.close());
118	}
119}
120
121struct StratumImpl {
122	/// Subscribed clients
123	subscribers: RwLock<Vec<SocketAddr>>,
124	/// List of workers supposed to receive job update
125	job_queue: RwLock<HashSet<SocketAddr>>,
126	/// Payload manager
127	dispatcher: Arc<dyn JobDispatcher>,
128	/// Authorized workers (socket - worker_id)
129	workers: Arc<RwLock<HashMap<SocketAddr, String>>>,
130	/// Secret if any
131	secret: Option<H256>,
132	/// Dispatch notify counter
133	notify_counter: RwLock<u32>,
134}
135
136impl StratumImpl {
137	/// rpc method `mining.subscribe`
138	fn subscribe(&self, _params: Params, meta: SocketMetadata) -> RpcResult {
139		use std::str::FromStr;
140
141		self.subscribers.write().push(meta.addr().clone());
142		self.job_queue.write().insert(meta.addr().clone());
143		trace!(target: "stratum", "Subscription request from {:?}", meta.addr());
144
145		Ok(match self.dispatcher.initial() {
146			Some(initial) => match tetsy_jsonrpc_core::Value::from_str(&initial) {
147				Ok(val) => Ok(val),
148				Err(e) => {
149					warn!(target: "stratum", "Invalid payload: '{}' ({:?})", &initial, e);
150					to_value(&[0u8; 0])
151				},
152			},
153			None => to_value(&[0u8; 0]),
154		}.expect("Empty slices are serializable; qed"))
155	}
156
157	/// rpc method `mining.authorize`
158	fn authorize(&self, params: Params, meta: SocketMetadata) -> RpcResult {
159		params.parse::<(String, String)>().map(|(worker_id, secret)| {
160			if let Some(valid_secret) = self.secret {
161				let hash = keccak(secret);
162				if hash != valid_secret {
163					return to_value(&false);
164				}
165			}
166			trace!(target: "stratum", "New worker #{} registered", worker_id);
167			self.workers.write().insert(meta.addr().clone(), worker_id);
168			to_value(true)
169		}).map(|v| v.expect("Only true/false is returned and it's always serializable; qed"))
170	}
171
172	/// rpc method `mining.submit`
173	fn submit(&self, params: Params, meta: SocketMetadata) -> RpcResult {
174		Ok(match params {
175			Params::Array(vals) => {
176				// first two elements are service messages (worker_id & job_id)
177				match self.dispatcher.submit(vals.iter().skip(2)
178					.filter_map(|val| match *val {
179						Value::String(ref s) => Some(s.to_owned()),
180						_ => None
181					})
182					.collect::<Vec<String>>()) {
183					Ok(()) => {
184						self.update_peers(&meta.tcp_dispatcher.expect("tcp_dispatcher is always initialized; qed"));
185						to_value(true)
186					},
187					Err(submit_err) => {
188						warn!("Error while submitting share: {:?}", submit_err);
189						to_value(false)
190					}
191				}
192			},
193			_ => {
194				trace!(target: "stratum", "Invalid submit work format {:?}", params);
195				to_value(false)
196			}
197		}.expect("Only true/false is returned and it's always serializable; qed"))
198	}
199
200	/// Helper method
201	fn update_peers(&self, tcp_dispatcher: &Dispatcher) {
202		if let Some(job) = self.dispatcher.job() {
203			self.push_work_all(job, tcp_dispatcher)
204		}
205	}
206
207	fn push_work_all(&self, payload: String, tcp_dispatcher: &Dispatcher) {
208		let hup_peers = {
209			let workers = self.workers.read();
210			let next_request_id = {
211				let mut counter = self.notify_counter.write();
212				if *counter == ::std::u32::MAX {
213					*counter = NOTIFY_COUNTER_INITIAL;
214				} else {
215					*counter = *counter + 1
216				}
217				*counter
218			};
219
220			let mut hup_peers = HashSet::new();
221			let workers_msg = format!("{{ \"id\": {}, \"method\": \"mining.notify\", \"params\": {} }}", next_request_id, payload);
222			trace!(target: "stratum", "pushing work for {} workers (payload: '{}')", workers.len(), &workers_msg);
223			for (addr, _) in workers.iter() {
224				trace!(target: "stratum", "pushing work to {}", addr);
225				match tcp_dispatcher.push_message(addr, workers_msg.clone()) {
226					Err(PushMessageError::NoSuchPeer) => {
227						trace!(target: "stratum", "Worker no longer connected: {}", addr);
228						hup_peers.insert(addr.clone());
229					},
230					Err(e) => {
231						warn!(target: "stratum", "Unexpected transport error: {:?}", e);
232					},
233					Ok(_) => {},
234				}
235			}
236			hup_peers
237		};
238
239		if !hup_peers.is_empty() {
240			let mut workers = self.workers.write();
241			for hup_peer in hup_peers {
242				workers.remove(&hup_peer);
243			}
244		}
245	}
246}
247
248#[derive(Clone)]
249pub struct SocketMetadata {
250	addr: SocketAddr,
251	// with the new version of tetsy-jsonrpc-core, SocketMetadata
252	// won't have to implement default, so this field will not
253	// have to be an Option
254	tcp_dispatcher: Option<Dispatcher>,
255}
256
257impl Default for SocketMetadata {
258	fn default() -> Self {
259		SocketMetadata {
260			addr: "0.0.0.0:0".parse().unwrap(),
261			tcp_dispatcher: None,
262		}
263	}
264}
265
266impl SocketMetadata {
267	pub fn addr(&self) -> &SocketAddr {
268		&self.addr
269	}
270}
271
272impl Metadata for SocketMetadata { }
273
274pub struct PeerMetaExtractor {
275	tcp_dispatcher: Dispatcher,
276}
277
278impl PeerMetaExtractor {
279	fn new(tcp_dispatcher: Dispatcher) -> Self {
280		PeerMetaExtractor {
281			tcp_dispatcher,
282		}
283	}
284}
285
286impl MetaExtractor<SocketMetadata> for PeerMetaExtractor {
287	fn extract(&self, context: &RequestContext) -> SocketMetadata {
288		SocketMetadata {
289			addr: context.peer_addr,
290			tcp_dispatcher: Some(self.tcp_dispatcher.clone()),
291		}
292	}
293}
294
295#[cfg(test)]
296mod tests {
297	use super::*;
298	use std::net::{SocketAddr, Shutdown};
299	use std::sync::Arc;
300
301	use tokio::{io, runtime::Runtime, timer::timeout::{self, Timeout}, net::TcpStream};
302	use tetsy_jsonrpc_core::futures::{Future, future};
303
304	pub struct VoidManager;
305
306	impl JobDispatcher for VoidManager {
307		fn submit(&self, _payload: Vec<String>) -> Result<(), Error> {
308			Ok(())
309		}
310	}
311
312	fn dummy_request(addr: &SocketAddr, data: &str) -> Vec<u8> {
313		let mut runtime = Runtime::new().expect("Tokio Runtime should be created with no errors");
314
315		let mut data_vec = data.as_bytes().to_vec();
316		data_vec.extend(b"\n");
317
318		let stream = TcpStream::connect(addr)
319			.and_then(move |stream| {
320				io::write_all(stream, data_vec)
321			})
322			.and_then(|(stream, _)| {
323				stream.shutdown(Shutdown::Write).unwrap();
324				io::read_to_end(stream, Vec::with_capacity(2048))
325			})
326			.and_then(|(_stream, read_buf)| {
327				future::ok(read_buf)
328			});
329			let result = runtime.block_on(stream).expect("Runtime should run with no errors");
330
331			result
332	}
333
334	#[test]
335	fn can_be_started() {
336		let stratum = Stratum::start(&"127.0.0.1:19980".parse().unwrap(), Arc::new(VoidManager), None);
337		assert!(stratum.is_ok());
338	}
339
340	#[test]
341	fn records_subscriber() {
342		let _ = ::env_logger::try_init();
343
344		let addr = "127.0.0.1:19985".parse().unwrap();
345		let stratum = Stratum::start(&addr, Arc::new(VoidManager), None).unwrap();
346		let request = r#"{"jsonrpc": "2.0", "method": "mining.subscribe", "params": [], "id": 1}"#;
347		dummy_request(&addr, request);
348		assert_eq!(1, stratum.implementation.subscribers.read().len());
349	}
350
351	struct DummyManager {
352		initial_payload: String
353	}
354
355	impl DummyManager {
356		fn new() -> Arc<DummyManager> {
357			Arc::new(Self::build())
358		}
359
360		fn build() -> DummyManager {
361			DummyManager { initial_payload: r#"[ "dummy payload" ]"#.to_owned() }
362		}
363
364		fn of_initial(mut self, new_initial: &str) -> DummyManager {
365			self.initial_payload = new_initial.to_owned();
366			self
367		}
368	}
369
370	impl JobDispatcher for DummyManager {
371		fn initial(&self) -> Option<String> {
372			Some(self.initial_payload.clone())
373		}
374
375		fn submit(&self, _payload: Vec<String>) -> Result<(), Error> {
376			Ok(())
377		}
378	}
379
380	fn terminated_str(origin: &'static str) -> String {
381		let mut s = String::new();
382		s.push_str(origin);
383		s.push_str("\n");
384		s
385	}
386
387	#[test]
388	fn receives_initial_payload() {
389		let addr = "127.0.0.1:19975".parse().unwrap();
390		let _stratum = Stratum::start(&addr, DummyManager::new(), None).expect("There should be no error starting stratum");
391		let request = r#"{"jsonrpc": "2.0", "method": "mining.subscribe", "params": [], "id": 2}"#;
392
393		let response = String::from_utf8(dummy_request(&addr, request)).unwrap();
394
395		assert_eq!(terminated_str(r#"{"jsonrpc":"2.0","result":["dummy payload"],"id":2}"#), response);
396	}
397
398	#[test]
399	fn can_authorize() {
400		let addr = "127.0.0.1:19970".parse().unwrap();
401		let stratum = Stratum::start(
402			&addr,
403			Arc::new(DummyManager::build().of_initial(r#"["dummy autorize payload"]"#)),
404			None
405		).expect("There should be no error starting stratum");
406
407		let request = r#"{"jsonrpc": "2.0", "method": "mining.authorize", "params": ["miner1", ""], "id": 1}"#;
408		let response = String::from_utf8(dummy_request(&addr, request)).unwrap();
409
410		assert_eq!(terminated_str(r#"{"jsonrpc":"2.0","result":true,"id":1}"#), response);
411		assert_eq!(1, stratum.implementation.workers.read().len());
412	}
413
414	#[test]
415	fn can_push_work() {
416		let _ = ::env_logger::try_init();
417
418		let addr = "127.0.0.1:19995".parse().unwrap();
419		let stratum = Stratum::start(
420			&addr,
421			Arc::new(DummyManager::build().of_initial(r#"["dummy autorize payload"]"#)),
422			None
423		).expect("There should be no error starting stratum");
424
425		let mut auth_request =
426			r#"{"jsonrpc": "2.0", "method": "mining.authorize", "params": ["miner1", ""], "id": 1}"#
427			.as_bytes()
428			.to_vec();
429		auth_request.extend(b"\n");
430
431		let auth_response = "{\"jsonrpc\":\"2.0\",\"result\":true,\"id\":1}\n";
432
433		let mut runtime = Runtime::new().expect("Tokio Runtime should be created with no errors");
434		let read_buf0 = vec![0u8; auth_response.len()];
435		let read_buf1 = Vec::with_capacity(2048);
436		let stream = TcpStream::connect(&addr)
437			.and_then(move |stream| {
438				io::write_all(stream, auth_request)
439			})
440			.and_then(|(stream, _)| {
441				io::read_exact(stream, read_buf0)
442			})
443			.map_err(|err| panic!("{:?}", err))
444			.and_then(move |(stream, read_buf0)| {
445				assert_eq!(String::from_utf8(read_buf0).unwrap(), auth_response);
446				trace!(target: "stratum", "Received authorization confirmation");
447				Timeout::new(future::ok(stream), ::std::time::Duration::from_millis(100))
448			})
449			.map_err(|err: timeout::Error<()>| panic!("Timeout: {:?}", err))
450			.and_then(move |stream| {
451				trace!(target: "stratum", "Pusing work to peers");
452				stratum.push_work_all(r#"{ "00040008", "100500" }"#.to_owned());
453				Timeout::new(future::ok(stream), ::std::time::Duration::from_millis(100))
454			})
455			.map_err(|err: timeout::Error<()>| panic!("Timeout: {:?}", err))
456			.and_then(|stream| {
457				trace!(target: "stratum", "Ready to read work from server");
458				stream.shutdown(Shutdown::Write).unwrap();
459				io::read_to_end(stream, read_buf1)
460			})
461			.and_then(|(_, read_buf1)| {
462				trace!(target: "stratum", "Received work from server");
463				future::ok(read_buf1)
464			});
465		let response = String::from_utf8(
466			runtime.block_on(stream).expect("Runtime should run with no errors")
467		).expect("Response should be utf-8");
468
469		assert_eq!(
470			"{ \"id\": 17, \"method\": \"mining.notify\", \"params\": { \"00040008\", \"100500\" } }\n",
471			response);
472	}
473
474	#[test]
475	fn jsonprc_server_is_send_and_sync() {
476		fn is_send_and_sync<T: Send + Sync>() {}
477
478		is_send_and_sync::<JsonRpcServer>();
479	}
480}