1use crate::message::{
6 ClientCommand, Credentials, ErrorReply, Job, JsonMessage, PoolCommand, PoolEvent,
7 PoolReply, PoolRequest, Share, WorkerId,
8};
9
10use serde_json;
11
12use std::convert::From;
13use std::default::Default;
14use std::io::{self, BufRead, BufReader, BufWriter, Write};
15use std::net::TcpStream;
16use std::time::Duration;
17
18use failure::Fail;
19use log::{debug, info, warn};
20use serde_derive::{Deserialize, Serialize};
21
22pub type Result<T> = std::result::Result<T, Error>;
24
25#[derive(Serialize, Deserialize, Debug, Copy, Clone, Eq, PartialEq)]
27pub struct RequestId(u32);
28
29struct ClientWriter {
31 stream: BufWriter<TcpStream>,
32 next_id: RequestId,
33}
34
35impl ClientWriter {
36 fn new(stream: BufWriter<TcpStream>) -> Self {
37 ClientWriter {
38 stream,
39 next_id: RequestId(1),
40 }
41 }
42
43 fn alloc_id(&mut self) -> RequestId {
44 let id = self.next_id.0;
45 self.next_id.0 = id.wrapping_add(1);
46 RequestId(id)
47 }
48
49 fn send(&mut self, command: PoolCommand) -> Result<RequestId> {
50 let id = self.alloc_id();
51 serde_json::to_writer(&mut self.stream, &PoolRequest { id, command })?;
52 writeln!(&mut self.stream)?;
53 self.stream.flush()?;
54 Ok(id)
55 }
56}
57
58pub struct PoolClientReader {
60 stream: BufReader<TcpStream>,
61 buf: String,
62}
63
64impl PoolClientReader {
65 fn new(stream: BufReader<TcpStream>) -> PoolClientReader {
66 PoolClientReader {
67 stream,
68 buf: Default::default(),
69 }
70 }
71
72 pub fn read(&mut self) -> Result<Option<PoolEvent<RequestId>>> {
73 self.buf.clear();
74 if let Err(e) = self.stream.read_line(&mut self.buf) {
75 return match e.kind() {
76 io::ErrorKind::WouldBlock | io::ErrorKind::TimedOut => Ok(None),
77 _ => Err(Error::from(e)),
78 };
79 };
80 debug!("read() success: \"{}\"", &self.buf);
81 if self.buf.is_empty() {
82 return Err(Error::disconnected());
83 }
84 let msg: JsonMessage<_> = serde_json::from_str(&self.buf)?;
85 Ok(msg.body)
86 }
87}
88
89pub struct PoolClientWriter {
91 writer: ClientWriter,
92 worker_id: WorkerId,
93}
94
95impl PoolClientWriter {
96 fn new(writer: ClientWriter, worker_id: WorkerId) -> Self {
97 PoolClientWriter { writer, worker_id }
98 }
99
100 pub fn keepalive(&mut self) -> Result<RequestId> {
102 self.writer
103 .send(PoolCommand::KeepAlived { id: self.worker_id })
104 }
105
106 pub fn submit(
108 &mut self,
109 job: &Job,
110 nonce: u32,
111 result: &[u8; 32],
112 ) -> Result<RequestId> {
113 self.writer.send(PoolCommand::Submit(Share {
114 worker_id: self.worker_id,
115 job_id: job.id(),
116 nonce,
117 result: *result,
118 algo: job.algo().map(|a| a.to_owned()).unwrap_or_else(String::new),
119 }))
120 }
122}
123
124pub fn connect(
126 address: &str,
127 login: &str,
128 pass: &str,
129 agent: &str,
130 keepalive: Option<Duration>,
131) -> Result<(PoolClientWriter, Job, PoolClientReader)> {
132 let stream_r = TcpStream::connect(address)?;
133 let stream_w = stream_r.try_clone()?;
134
135 stream_w.set_nodelay(true)?;
136 let stream_w = BufWriter::with_capacity(1500, stream_w);
137 let mut writer = ClientWriter::new(stream_w);
138 let algo = vec!["cn/1".to_owned()];
139 let (login, pass, agent) = (login.to_owned(), pass.to_owned(), agent.to_owned());
140 let req_id = writer.send(PoolCommand::Login(Credentials {
141 login,
142 pass,
143 agent,
144 algo,
145 }))?;
146 debug!("login sent: {:?}", req_id);
147
148 stream_r.set_read_timeout(keepalive)?;
149 let stream_r = BufReader::with_capacity(1500, stream_r);
150 let mut reader = PoolClientReader::new(stream_r);
151 let (wid, job, status) = loop {
152 match reader.read()?.ok_or_else(Error::login_timed_out)? {
153 PoolEvent::PoolReply {
154 id,
155 error: None,
156 result: Some(PoolReply::Job(assignment)),
157 } => {
158 debug_assert_eq!(id, req_id);
159 let worker_id = assignment.worker_id();
160 let status = assignment.status().map(|x| x.to_owned());
161 let job = assignment.into_job();
162 break (worker_id, job, status);
163 }
164 PoolEvent::PoolReply { error: Some(e), .. } => return Err(Error(Error_::ErrorReply(e))),
165 PoolEvent::ClientCommand(ClientCommand::Job(_)) => {
166 warn!("ignoring job notification received during login");
167 continue;
168 }
169 _ => return Err(Error::login_unexpected_reply()),
170 };
171 };
172 info!("login successful: status \"{:?}\"", status);
173
174 let writer = PoolClientWriter::new(writer, wid);
175 Ok((writer, job, reader))
176}
177
178#[derive(Fail, Debug)]
183#[fail(display = "{}", _0)]
184pub struct Error(Error_);
185
186#[derive(Fail, Debug)]
187pub enum Error_ {
188 #[fail(display = "{}", _0)]
189 IoError(#[cause] io::Error),
190 #[fail(display = "{}", _0)]
191 MessageError(#[cause] serde_json::Error),
192 #[fail(display = "disconnected")]
193 Disconnected,
194 #[fail(display = "read timeout during login")]
195 LoginTimedOut,
196 #[fail(display = "unexpected reply during login")]
197 LoginUnexpectedReply,
198 #[fail(display = "server reports error: {}", _0)]
199 ErrorReply(ErrorReply),
200}
201
202impl Error {
203 fn disconnected() -> Self {
204 Error(Error_::Disconnected)
205 }
206 fn login_timed_out() -> Self {
207 Error(Error_::LoginTimedOut)
208 }
209 fn login_unexpected_reply() -> Self {
210 Error(Error_::LoginUnexpectedReply)
211 }
212}
213
214impl From<io::Error> for Error {
215 fn from(error: io::Error) -> Self {
216 Error(Error_::IoError(error))
217 }
218}
219
220impl From<serde_json::Error> for Error {
221 fn from(error: serde_json::Error) -> Self {
222 Error(Error_::MessageError(error))
223 }
224}
225
226impl From<ErrorReply> for Error {
227 fn from(error: ErrorReply) -> Self {
228 Error(Error_::ErrorReply(error))
229 }
230}
231
232#[cfg(test)]
237mod tests {
238 use super::*;
239
240 static EXAMPLE_LOGINREPLY_STR: &'static str = concat!(
241 r#"{"id":0,"jsonrpc":"2.0","result":{"id":"0","job":"#,
242 r#"{"blob":"0606de93b8d0055f149bdc720d9b8928e51399dbc2f85b069aa10142fff7b8814a296424f3659"#,
243 r#"00000000019be9ee931ce265444a4d5b599d1e463f1f7fbada6517218fe65aea3a73390a406","#,
244 r#""job_id":"12022","target":"b7d10000"},"status":"OK"},"error":null}"#
245 );
246 static EXAMPLE_JOBCOMMAND_STR: &'static str = concat!(
247 r#"{"jsonrpc":"2.0","method":"job","params":"#,
248 r#"{"blob":"06068795b8d0055b9272a308e09675e9c4c1510e84921e1ff0bfa13fc375eb8eec2207408205c"#,
249 r#"000000000da5d4af05371b7bda75eef0d73cbbead3773006bd9117b1ca7dbcc9dacc1284d0d","#,
250 r#""job_id":"12023","target":"b7d10000"}}"#
251 );
252
253 #[test]
254 fn deserialize_login_reply() {
255 let _: PoolEvent<u32> = serde_json::from_str(EXAMPLE_LOGINREPLY_STR).unwrap();
256 }
257
258 #[test]
259 fn deserialize_job_command() {
260 let _: PoolEvent<u32> = serde_json::from_str(EXAMPLE_JOBCOMMAND_STR).unwrap();
261 }
262}