cn_stratum/client/
connection.rs

1// copyright 2017 Kaz Wesley
2
3//! session layer of a pool client
4
5use crate::message::{
6    ClientCommand, Credentials, ErrorReply, Job, JsonMessage, PoolCommand, PoolEvent,
7    PoolReply, PoolRequest, Share, WorkerId,
8};
9
10use serde_json;
11
12use std::convert::From;
13use std::default::Default;
14use std::io::{self, BufRead, BufReader, BufWriter, Write};
15use std::net::TcpStream;
16use std::time::Duration;
17
18use failure::Fail;
19use log::{debug, info, warn};
20use serde_derive::{Deserialize, Serialize};
21
22/// Result of client operation.
23pub type Result<T> = std::result::Result<T, Error>;
24
25/// Id for matching our requests with server replies.
26#[derive(Serialize, Deserialize, Debug, Copy, Clone, Eq, PartialEq)]
27pub struct RequestId(u32);
28
29/// Write-end of a connection to a pool.
30struct ClientWriter {
31    stream: BufWriter<TcpStream>,
32    next_id: RequestId,
33}
34
35impl ClientWriter {
36    fn new(stream: BufWriter<TcpStream>) -> Self {
37        ClientWriter {
38            stream,
39            next_id: RequestId(1),
40        }
41    }
42
43    fn alloc_id(&mut self) -> RequestId {
44        let id = self.next_id.0;
45        self.next_id.0 = id.wrapping_add(1);
46        RequestId(id)
47    }
48
49    fn send(&mut self, command: PoolCommand) -> Result<RequestId> {
50        let id = self.alloc_id();
51        serde_json::to_writer(&mut self.stream, &PoolRequest { id, command })?;
52        writeln!(&mut self.stream)?;
53        self.stream.flush()?;
54        Ok(id)
55    }
56}
57
58/// Read-end of a connection to a pool.
59pub struct PoolClientReader {
60    stream: BufReader<TcpStream>,
61    buf: String,
62}
63
64impl PoolClientReader {
65    fn new(stream: BufReader<TcpStream>) -> PoolClientReader {
66        PoolClientReader {
67            stream,
68            buf: Default::default(),
69        }
70    }
71
72    pub fn read(&mut self) -> Result<Option<PoolEvent<RequestId>>> {
73        self.buf.clear();
74        if let Err(e) = self.stream.read_line(&mut self.buf) {
75            return match e.kind() {
76                io::ErrorKind::WouldBlock | io::ErrorKind::TimedOut => Ok(None),
77                _ => Err(Error::from(e)),
78            };
79        };
80        debug!("read() success: \"{}\"", &self.buf);
81        if self.buf.is_empty() {
82            return Err(Error::disconnected());
83        }
84        let msg: JsonMessage<_> = serde_json::from_str(&self.buf)?;
85        Ok(msg.body)
86    }
87}
88
89/// Write-end of a logged-in connection to a pool.
90pub struct PoolClientWriter {
91    writer: ClientWriter,
92    worker_id: WorkerId,
93}
94
95impl PoolClientWriter {
96    fn new(writer: ClientWriter, worker_id: WorkerId) -> Self {
97        PoolClientWriter { writer, worker_id }
98    }
99
100    /// Send a keepalive message.
101    pub fn keepalive(&mut self) -> Result<RequestId> {
102        self.writer
103            .send(PoolCommand::KeepAlived { id: self.worker_id })
104    }
105
106    /// Submit a share.
107    pub fn submit(
108        &mut self,
109        job: &Job,
110        nonce: u32,
111        result: &[u8; 32],
112    ) -> Result<RequestId> {
113        self.writer.send(PoolCommand::Submit(Share {
114            worker_id: self.worker_id,
115            job_id: job.id(),
116            nonce,
117            result: *result,
118            algo: job.algo().map(|a| a.to_owned()).unwrap_or_else(String::new),
119        }))
120        // 1 PoolReply::StatusReply expected
121    }
122}
123
124/// synchronously login to server
125pub fn connect(
126    address: &str,
127    login: &str,
128    pass: &str,
129    agent: &str,
130    keepalive: Option<Duration>,
131) -> Result<(PoolClientWriter, Job, PoolClientReader)> {
132    let stream_r = TcpStream::connect(address)?;
133    let stream_w = stream_r.try_clone()?;
134
135    stream_w.set_nodelay(true)?;
136    let stream_w = BufWriter::with_capacity(1500, stream_w);
137    let mut writer = ClientWriter::new(stream_w);
138    let algo = vec!["cn/1".to_owned()];
139    let (login, pass, agent) = (login.to_owned(), pass.to_owned(), agent.to_owned());
140    let req_id = writer.send(PoolCommand::Login(Credentials {
141        login,
142        pass,
143        agent,
144        algo,
145    }))?;
146    debug!("login sent: {:?}", req_id);
147
148    stream_r.set_read_timeout(keepalive)?;
149    let stream_r = BufReader::with_capacity(1500, stream_r);
150    let mut reader = PoolClientReader::new(stream_r);
151    let (wid, job, status) = loop {
152        match reader.read()?.ok_or_else(Error::login_timed_out)? {
153            PoolEvent::PoolReply {
154                id,
155                error: None,
156                result: Some(PoolReply::Job(assignment)),
157            } => {
158                debug_assert_eq!(id, req_id);
159                let worker_id = assignment.worker_id();
160                let status = assignment.status().map(|x| x.to_owned());
161                let job = assignment.into_job();
162                break (worker_id, job, status);
163            }
164            PoolEvent::PoolReply { error: Some(e), .. } => return Err(Error(Error_::ErrorReply(e))),
165            PoolEvent::ClientCommand(ClientCommand::Job(_)) => {
166                warn!("ignoring job notification received during login");
167                continue;
168            }
169            _ => return Err(Error::login_unexpected_reply()),
170        };
171    };
172    info!("login successful: status \"{:?}\"", status);
173
174    let writer = PoolClientWriter::new(writer, wid);
175    Ok((writer, job, reader))
176}
177
178////////////////////
179// errors
180////////////////////
181
182#[derive(Fail, Debug)]
183#[fail(display = "{}", _0)]
184pub struct Error(Error_);
185
186#[derive(Fail, Debug)]
187pub enum Error_ {
188    #[fail(display = "{}", _0)]
189    IoError(#[cause] io::Error),
190    #[fail(display = "{}", _0)]
191    MessageError(#[cause] serde_json::Error),
192    #[fail(display = "disconnected")]
193    Disconnected,
194    #[fail(display = "read timeout during login")]
195    LoginTimedOut,
196    #[fail(display = "unexpected reply during login")]
197    LoginUnexpectedReply,
198    #[fail(display = "server reports error: {}", _0)]
199    ErrorReply(ErrorReply),
200}
201
202impl Error {
203    fn disconnected() -> Self {
204        Error(Error_::Disconnected)
205    }
206    fn login_timed_out() -> Self {
207        Error(Error_::LoginTimedOut)
208    }
209    fn login_unexpected_reply() -> Self {
210        Error(Error_::LoginUnexpectedReply)
211    }
212}
213
214impl From<io::Error> for Error {
215    fn from(error: io::Error) -> Self {
216        Error(Error_::IoError(error))
217    }
218}
219
220impl From<serde_json::Error> for Error {
221    fn from(error: serde_json::Error) -> Self {
222        Error(Error_::MessageError(error))
223    }
224}
225
226impl From<ErrorReply> for Error {
227    fn from(error: ErrorReply) -> Self {
228        Error(Error_::ErrorReply(error))
229    }
230}
231
232////////////////////
233// testing
234////////////////////
235
236#[cfg(test)]
237mod tests {
238    use super::*;
239
240    static EXAMPLE_LOGINREPLY_STR: &'static str = concat!(
241        r#"{"id":0,"jsonrpc":"2.0","result":{"id":"0","job":"#,
242        r#"{"blob":"0606de93b8d0055f149bdc720d9b8928e51399dbc2f85b069aa10142fff7b8814a296424f3659"#,
243        r#"00000000019be9ee931ce265444a4d5b599d1e463f1f7fbada6517218fe65aea3a73390a406","#,
244        r#""job_id":"12022","target":"b7d10000"},"status":"OK"},"error":null}"#
245    );
246    static EXAMPLE_JOBCOMMAND_STR: &'static str = concat!(
247        r#"{"jsonrpc":"2.0","method":"job","params":"#,
248        r#"{"blob":"06068795b8d0055b9272a308e09675e9c4c1510e84921e1ff0bfa13fc375eb8eec2207408205c"#,
249        r#"000000000da5d4af05371b7bda75eef0d73cbbead3773006bd9117b1ca7dbcc9dacc1284d0d","#,
250        r#""job_id":"12023","target":"b7d10000"}}"#
251    );
252
253    #[test]
254    fn deserialize_login_reply() {
255        let _: PoolEvent<u32> = serde_json::from_str(EXAMPLE_LOGINREPLY_STR).unwrap();
256    }
257
258    #[test]
259    fn deserialize_job_command() {
260        let _: PoolEvent<u32> = serde_json::from_str(EXAMPLE_JOBCOMMAND_STR).unwrap();
261    }
262}