Skip to main content

ckb_miner/
client.rs

1use crate::Work;
2use base64::Engine;
3use ckb_app_config::MinerClientConfig;
4use ckb_async_runtime::Handle;
5use ckb_channel::Sender;
6use ckb_jsonrpc_types::{Block as JsonBlock, BlockTemplate};
7use ckb_logger::{debug, error, info};
8use ckb_stop_handler::{CancellationToken, new_tokio_exit_rx};
9use ckb_types::{
10    H256,
11    packed::{Block, Byte32},
12};
13use futures::prelude::*;
14use http_body_util::{BodyExt, Empty, Full};
15use hyper::{
16    Error as HyperError, Request, Response, Uri,
17    body::{Buf, Bytes},
18    header::{CONTENT_TYPE, HeaderValue},
19    service::service_fn,
20};
21use hyper_util::{
22    client::legacy::{Client as HttpClient, Error as ClientError},
23    rt::TokioExecutor,
24    server::{conn::auto, graceful::GracefulShutdown},
25};
26use jsonrpc_core::{
27    error::Error as RpcFail, error::ErrorCode as RpcFailCode, id::Id, params::Params,
28    request::MethodCall, response::Output, version::Version,
29};
30use serde_json::error::Error as JsonError;
31use serde_json::{self, Value, json};
32use std::net::SocketAddr;
33use std::sync::Arc;
34use std::sync::atomic::{AtomicU64, Ordering};
35use std::{convert::Into, time};
36use tokio::{
37    net::TcpListener,
38    sync::{mpsc, oneshot},
39};
40
41type RpcRequest = (oneshot::Sender<Result<Bytes, RpcError>>, MethodCall);
42
43#[derive(Debug)]
44pub enum RpcError {
45    Http(HyperError),
46    Client(ClientError),
47    Canceled, //oneshot canceled
48    Json(JsonError),
49    Fail(RpcFail),
50    SendError,
51    NoRespData,
52}
53
54#[derive(Debug, Clone)]
55pub struct Rpc {
56    sender: mpsc::Sender<RpcRequest>,
57}
58
59impl Rpc {
60    pub fn new(url: Uri, handle: Handle) -> Rpc {
61        let (sender, mut receiver) = mpsc::channel(65_535);
62        let stop_rx: CancellationToken = new_tokio_exit_rx();
63
64        let https = hyper_tls::HttpsConnector::new();
65        let client = HttpClient::builder(TokioExecutor::new()).build::<_, Full<Bytes>>(https);
66        let loop_handle = handle.clone();
67        handle.spawn(async move {
68            loop {
69                tokio::select! {
70                    Some(item) = receiver.recv() => {
71                        let (sender, call): RpcRequest = item;
72                        let req_url = url.clone();
73                        let request_json = serde_json::to_vec(&call).expect("valid rpc call");
74
75                        let mut req = Request::builder().uri(req_url).method("POST").header(CONTENT_TYPE, "application/json");
76
77                        if let Some(value) = parse_authorization(&url) {
78                            req = req
79                                .header(hyper::header::AUTHORIZATION, value);
80                        }
81                        let req = req.body(Full::new(Bytes::from(request_json))).unwrap();
82                        let client = client.clone();
83                        loop_handle.spawn(async move {
84                            let request = match client
85                                .request(req)
86                                .await
87                                .map(|res|res.into_body())
88                            {
89                                Ok(body) => BodyExt::collect(body).await.map_err(RpcError::Http).map(|t| t.to_bytes()),
90                                Err(err) => Err(RpcError::Client(err)),
91                            };
92                            if sender.send(request).is_err() {
93                                error!("rpc response send back error")
94                            }
95                        });
96                    },
97                    _ = stop_rx.cancelled() => {
98                        info!("Rpc server received exit signal, exit now");
99                        break
100                    },
101                    else => break
102                }
103            }
104        });
105
106        Rpc { sender }
107    }
108
109    pub fn request(
110        self,
111        method: String,
112        params: Vec<Value>,
113    ) -> impl Future<Output = Result<Output, RpcError>> {
114        let (tx, rev) = oneshot::channel();
115
116        let call = MethodCall {
117            method,
118            params: Params::Array(params),
119            jsonrpc: Some(Version::V2),
120            id: Id::Num(0),
121        };
122
123        let req = (tx, call);
124        let sender = self.sender;
125        async move {
126            sender
127                .clone()
128                .send(req)
129                .map_err(|_| RpcError::SendError)
130                .await?;
131            rev.map_err(|_| RpcError::Canceled)
132                .await?
133                .and_then(|chunk| serde_json::from_slice(&chunk).map_err(RpcError::Json))
134        }
135    }
136}
137
138pub enum Works {
139    New(Work),
140    FailSubmit(Byte32),
141}
142
143/// Mining client that fetches block templates and submits solutions.
144#[derive(Debug, Clone)]
145pub struct Client {
146    /// Current work ID being processed.
147    pub current_work_id: Arc<AtomicU64>,
148    /// Channel sender for new work notifications.
149    pub new_work_tx: Sender<Works>,
150    /// Miner client configuration.
151    pub config: MinerClientConfig,
152    /// RPC client for communicating with the CKB node.
153    pub rpc: Rpc,
154    handle: Handle,
155}
156
157impl Client {
158    /// Construct new Client
159    pub fn new(new_work_tx: Sender<Works>, config: MinerClientConfig, handle: Handle) -> Client {
160        let uri: Uri = config.rpc_url.parse().expect("valid rpc url");
161
162        Client {
163            current_work_id: Arc::new(AtomicU64::new(0)),
164            rpc: Rpc::new(uri, handle.clone()),
165            new_work_tx,
166            config,
167            handle,
168        }
169    }
170
171    fn send_submit_block_request(
172        &self,
173        work_id: &str,
174        block: Block,
175    ) -> impl Future<Output = Result<Output, RpcError>> + 'static + Send {
176        let block: JsonBlock = block.into();
177        let method = "submit_block".to_owned();
178        let params = vec![json!(work_id), json!(block)];
179
180        self.rpc.clone().request(method, params)
181    }
182
183    pub(crate) fn submit_block(&self, work_id: &str, block: Block) -> Result<(), RpcError> {
184        let parent = block.header().raw().parent_hash();
185        let future = self
186            .send_submit_block_request(work_id, block)
187            .and_then(parse_response::<H256>);
188
189        if self.config.block_on_submit {
190            self.handle.block_on(future).map(|_| ())
191        } else {
192            let sender = self.new_work_tx.clone();
193            self.handle.spawn(async move {
194                if let Err(e) = future.await {
195                    error!("rpc call submit_block error: {:?}", e);
196                    sender.send(Works::FailSubmit(parent)).unwrap()
197                }
198            });
199            Ok(())
200        }
201    }
202
203    /// spawn background update process
204    pub fn spawn_background(self) {
205        let client = self.clone();
206        if let Some(addr) = self.config.listen {
207            ckb_logger::info!("listen notify mode : {}", addr);
208            ckb_logger::info!(
209                r#"
210Please note that ckb-miner runs in notify mode. \
211You should configure the corresponding information in CKB block assembler, \
212for example:
213
214[block_assembler]
215...
216notify = ["http://{}"]
217
218Otherwise ckb-miner will malfunction and stop submitting valid blocks after a certain period.
219"#,
220                addr
221            );
222            self.handle.spawn(async move {
223                client.listen_block_template_notify(addr).await;
224            });
225            self.blocking_fetch_block_template();
226        } else {
227            ckb_logger::info!("loop poll mode: interval {}ms", self.config.poll_interval);
228            self.handle.spawn(async move {
229                client.poll_block_template().await;
230            });
231        }
232    }
233
234    async fn listen_block_template_notify(&self, addr: SocketAddr) {
235        let listener = TcpListener::bind(addr).await.unwrap();
236        let server = auto::Builder::new(TokioExecutor::new());
237        let graceful = GracefulShutdown::new();
238        let stop_rx: CancellationToken = new_tokio_exit_rx();
239
240        loop {
241            let client = self.clone();
242            let handle = service_fn(move |req| handle(client.clone(), req));
243            tokio::select! {
244                conn = listener.accept() => {
245                    let (stream, _) = match conn {
246                        Ok(conn) => conn,
247                        Err(e) => {
248                            info!("accept error: {}", e);
249                            tokio::time::sleep(std::time::Duration::from_secs(1)).await;
250                            continue;
251                        }
252                    };
253                    let stream = hyper_util::rt::TokioIo::new(Box::pin(stream));
254                    let conn = server.serve_connection_with_upgrades(stream, handle);
255
256                    let conn = graceful.watch(conn.into_owned());
257                    tokio::spawn(async move {
258                        if let Err(err) = conn.await {
259                            info!("connection error: {}", err);
260                        }
261                    });
262                },
263                _ = stop_rx.cancelled() => {
264                    info!("Miner client received exit signal. Exit now");
265                    break;
266                }
267            }
268        }
269        drop(listener);
270        graceful.shutdown().await;
271    }
272
273    async fn poll_block_template(&self) {
274        let poll_interval = time::Duration::from_millis(self.config.poll_interval);
275        let mut interval = tokio::time::interval(poll_interval);
276        interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
277        let stop_rx: CancellationToken = new_tokio_exit_rx();
278        loop {
279            tokio::select! {
280                _ = interval.tick() => {
281                    debug!("poll block template...");
282                    self.fetch_block_template().await;
283                }
284                _ = stop_rx.cancelled() => {
285                    info!("Miner client pool_block_template received exit signal, exit now");
286                    break
287                },
288                else => break,
289            }
290        }
291    }
292
293    fn update_block_template(&self, block_template: BlockTemplate) {
294        let work_id = block_template.work_id.into();
295        let updated = |id| {
296            if id != work_id || id == 0 {
297                Some(work_id)
298            } else {
299                None
300            }
301        };
302        if self
303            .current_work_id
304            .fetch_update(Ordering::SeqCst, Ordering::SeqCst, updated)
305            .is_ok()
306        {
307            let work: Work = block_template.into();
308            if let Err(e) = self.new_work_tx.send(Works::New(work)) {
309                error!("notify_new_block error: {:?}", e);
310            }
311        }
312    }
313
314    pub(crate) fn blocking_fetch_block_template(&self) {
315        self.handle.block_on(self.fetch_block_template())
316    }
317
318    async fn fetch_block_template(&self) {
319        match self.get_block_template().await {
320            Ok(block_template) => {
321                self.update_block_template(block_template);
322            }
323            Err(ref err) => {
324                let is_method_not_found = if let RpcError::Fail(RpcFail { code, .. }) = err {
325                    *code == RpcFailCode::MethodNotFound
326                } else {
327                    false
328                };
329                if is_method_not_found {
330                    error!(
331                        "RPC Method Not Found: \
332                         Please perform the following checks: \
333                         1. Ensure that the CKB server has enabled the Miner API module; \
334                         2. Verify that the CKB server has set the `block_assembler` correctly; \
335                         3. Confirm that the RPC URL for CKB miner is correct.",
336                    );
337                } else {
338                    error!("rpc call get_block_template error: {:?}", err);
339                }
340            }
341        }
342    }
343
344    async fn get_block_template(&self) -> Result<BlockTemplate, RpcError> {
345        let method = "get_block_template".to_owned();
346        let params = vec![];
347
348        self.rpc
349            .clone()
350            .request(method, params)
351            .and_then(parse_response)
352            .await
353    }
354}
355
356type Error = Box<dyn std::error::Error + Send + Sync>;
357
358async fn handle(
359    client: Client,
360    req: Request<hyper::body::Incoming>,
361) -> Result<Response<Empty<Bytes>>, Error> {
362    let body = BodyExt::collect(req).await?.aggregate();
363
364    if let Ok(template) = serde_json::from_reader(body.reader()) {
365        client.update_block_template(template);
366    }
367
368    Ok(Response::new(Empty::new()))
369}
370
371async fn parse_response<T: serde::de::DeserializeOwned>(output: Output) -> Result<T, RpcError> {
372    match output {
373        Output::Success(success) => {
374            serde_json::from_value::<T>(success.result).map_err(RpcError::Json)
375        }
376        Output::Failure(failure) => Err(RpcError::Fail(failure.error)),
377    }
378}
379
380fn parse_authorization(url: &Uri) -> Option<HeaderValue> {
381    let a: Vec<&str> = url.authority()?.as_str().split('@').collect();
382    if a.len() >= 2 {
383        if a[0].is_empty() {
384            return None;
385        }
386        let mut encoded = "Basic ".to_string();
387        base64::prelude::BASE64_STANDARD.encode_string(a[0], &mut encoded);
388        let mut header = HeaderValue::from_str(&encoded).unwrap();
389        header.set_sensitive(true);
390        Some(header)
391    } else {
392        None
393    }
394}