cn_stratum/client/
mod.rs

1// copyright 2017 Kaz Wesley
2
3//! Pool client implementation. Client API is synchronous because users will not normally want to
4//! multiplex lots of upstreams.
5
6mod connection;
7
8use self::connection::PoolClientReader;
9use crate::message::{ClientCommand, PoolEvent, PoolReply};
10
11pub use self::connection::{PoolClientWriter, RequestId, Result};
12pub use crate::message::{ErrorReply, Job, JobAssignment};
13
14use std::sync::{Arc, Mutex};
15use std::time::Duration;
16
17use log::*;
18
19/// Trait for objects that can handle messages received from the pool.
20pub trait MessageHandler {
21    fn job_command(&mut self, job: Job);
22    fn error_reply(&mut self, id: RequestId, error: ErrorReply);
23    fn status_reply(&mut self, id: RequestId, status: String);
24    fn job_reply(&mut self, id: RequestId, job: Box<JobAssignment>);
25}
26
27/// A synchronous stratum pool client, customized with a MessageHandler.
28pub struct PoolClient<H> {
29    writer: Arc<Mutex<PoolClientWriter>>,
30    reader: PoolClientReader,
31    handler: H,
32}
33
34impl<H: MessageHandler> PoolClient<H> {
35    /// Synchronously connect to the server; pass the initial job to a MessageHandler constructor.
36    pub fn connect<F>(
37        address: &str,
38        login: &str,
39        pass: &str,
40        keepalive: Option<Duration>,
41        agent: &str,
42        make_handler: F,
43    ) -> Result<Self>
44    where
45        F: FnOnce(Job) -> H,
46    {
47        let (writer, work, reader) = connection::connect(address, login, pass, agent, keepalive)?;
48        debug!("client connected, initial job: {:?}", &work);
49        let writer = Arc::new(Mutex::new(writer));
50        let handler = make_handler(work);
51        Ok(PoolClient {
52            writer,
53            reader,
54            handler,
55        })
56    }
57
58    /// Return a new handle to the write end of the client connection.
59    pub fn write_handle(&self) -> Arc<Mutex<PoolClientWriter>> {
60        Arc::clone(&self.writer)
61    }
62
63    /// Borrow the message handler that was created in connect().
64    pub fn handler(&self) -> &H {
65        &self.handler
66    }
67
68    /// Handle messages until the connection is closed.
69    pub fn run(mut self) -> Result<()> {
70        loop {
71            let event = if let Some(event) = self.reader.read()? {
72                event
73            } else {
74                debug!("read timeout; sending keepalive");
75                self.writer.lock().unwrap().keepalive().unwrap();
76                continue;
77            };
78            match event {
79                PoolEvent::ClientCommand(ClientCommand::Job(j)) => self.handler.job_command(j),
80                PoolEvent::PoolReply {
81                    id,
82                    error: Some(error),
83                    ..
84                } => self.handler.error_reply(id, error),
85                PoolEvent::PoolReply {
86                    id,
87                    error: None,
88                    result: Some(PoolReply::Status { status }),
89                } => self.handler.status_reply(id, status),
90                PoolEvent::PoolReply {
91                    id,
92                    error: None,
93                    result: Some(PoolReply::Job(job)),
94                } => self.handler.job_reply(id, job),
95                PoolEvent::PoolReply {
96                    error: None,
97                    result: None,
98                    ..
99                } => warn!("pool reply with no content"),
100            }
101        }
102    }
103}