1use crate::Work;
2use base64::Engine;
3use ckb_app_config::MinerClientConfig;
4use ckb_async_runtime::Handle;
5use ckb_channel::Sender;
6use ckb_jsonrpc_types::{Block as JsonBlock, BlockTemplate};
7use ckb_logger::{debug, error, info};
8use ckb_stop_handler::{CancellationToken, new_tokio_exit_rx};
9use ckb_types::{
10 H256,
11 packed::{Block, Byte32},
12};
13use futures::prelude::*;
14use http_body_util::{BodyExt, Empty, Full};
15use hyper::{
16 Error as HyperError, Request, Response, Uri,
17 body::{Buf, Bytes},
18 header::{CONTENT_TYPE, HeaderValue},
19 service::service_fn,
20};
21use hyper_util::{
22 client::legacy::{Client as HttpClient, Error as ClientError},
23 rt::TokioExecutor,
24 server::{conn::auto, graceful::GracefulShutdown},
25};
26use jsonrpc_core::{
27 error::Error as RpcFail, error::ErrorCode as RpcFailCode, id::Id, params::Params,
28 request::MethodCall, response::Output, version::Version,
29};
30use serde_json::error::Error as JsonError;
31use serde_json::{self, Value, json};
32use std::net::SocketAddr;
33use std::sync::Arc;
34use std::sync::atomic::{AtomicU64, Ordering};
35use std::{convert::Into, time};
36use tokio::{
37 net::TcpListener,
38 sync::{mpsc, oneshot},
39};
40
41type RpcRequest = (oneshot::Sender<Result<Bytes, RpcError>>, MethodCall);
42
43#[derive(Debug)]
44pub enum RpcError {
45 Http(HyperError),
46 Client(ClientError),
47 Canceled, Json(JsonError),
49 Fail(RpcFail),
50 SendError,
51 NoRespData,
52}
53
54#[derive(Debug, Clone)]
55pub struct Rpc {
56 sender: mpsc::Sender<RpcRequest>,
57}
58
59impl Rpc {
60 pub fn new(url: Uri, handle: Handle) -> Rpc {
61 let (sender, mut receiver) = mpsc::channel(65_535);
62 let stop_rx: CancellationToken = new_tokio_exit_rx();
63
64 let https = hyper_tls::HttpsConnector::new();
65 let client = HttpClient::builder(TokioExecutor::new()).build::<_, Full<Bytes>>(https);
66 let loop_handle = handle.clone();
67 handle.spawn(async move {
68 loop {
69 tokio::select! {
70 Some(item) = receiver.recv() => {
71 let (sender, call): RpcRequest = item;
72 let req_url = url.clone();
73 let request_json = serde_json::to_vec(&call).expect("valid rpc call");
74
75 let mut req = Request::builder().uri(req_url).method("POST").header(CONTENT_TYPE, "application/json");
76
77 if let Some(value) = parse_authorization(&url) {
78 req = req
79 .header(hyper::header::AUTHORIZATION, value);
80 }
81 let req = req.body(Full::new(Bytes::from(request_json))).unwrap();
82 let client = client.clone();
83 loop_handle.spawn(async move {
84 let request = match client
85 .request(req)
86 .await
87 .map(|res|res.into_body())
88 {
89 Ok(body) => BodyExt::collect(body).await.map_err(RpcError::Http).map(|t| t.to_bytes()),
90 Err(err) => Err(RpcError::Client(err)),
91 };
92 if sender.send(request).is_err() {
93 error!("rpc response send back error")
94 }
95 });
96 },
97 _ = stop_rx.cancelled() => {
98 info!("Rpc server received exit signal, exit now");
99 break
100 },
101 else => break
102 }
103 }
104 });
105
106 Rpc { sender }
107 }
108
109 pub fn request(
110 self,
111 method: String,
112 params: Vec<Value>,
113 ) -> impl Future<Output = Result<Output, RpcError>> {
114 let (tx, rev) = oneshot::channel();
115
116 let call = MethodCall {
117 method,
118 params: Params::Array(params),
119 jsonrpc: Some(Version::V2),
120 id: Id::Num(0),
121 };
122
123 let req = (tx, call);
124 let sender = self.sender;
125 async move {
126 sender
127 .clone()
128 .send(req)
129 .map_err(|_| RpcError::SendError)
130 .await?;
131 rev.map_err(|_| RpcError::Canceled)
132 .await?
133 .and_then(|chunk| serde_json::from_slice(&chunk).map_err(RpcError::Json))
134 }
135 }
136}
137
138pub enum Works {
139 New(Work),
140 FailSubmit(Byte32),
141}
142
143#[derive(Debug, Clone)]
145pub struct Client {
146 pub current_work_id: Arc<AtomicU64>,
148 pub new_work_tx: Sender<Works>,
150 pub config: MinerClientConfig,
152 pub rpc: Rpc,
154 handle: Handle,
155}
156
157impl Client {
158 pub fn new(new_work_tx: Sender<Works>, config: MinerClientConfig, handle: Handle) -> Client {
160 let uri: Uri = config.rpc_url.parse().expect("valid rpc url");
161
162 Client {
163 current_work_id: Arc::new(AtomicU64::new(0)),
164 rpc: Rpc::new(uri, handle.clone()),
165 new_work_tx,
166 config,
167 handle,
168 }
169 }
170
171 fn send_submit_block_request(
172 &self,
173 work_id: &str,
174 block: Block,
175 ) -> impl Future<Output = Result<Output, RpcError>> + 'static + Send {
176 let block: JsonBlock = block.into();
177 let method = "submit_block".to_owned();
178 let params = vec![json!(work_id), json!(block)];
179
180 self.rpc.clone().request(method, params)
181 }
182
183 pub(crate) fn submit_block(&self, work_id: &str, block: Block) -> Result<(), RpcError> {
184 let parent = block.header().raw().parent_hash();
185 let future = self
186 .send_submit_block_request(work_id, block)
187 .and_then(parse_response::<H256>);
188
189 if self.config.block_on_submit {
190 self.handle.block_on(future).map(|_| ())
191 } else {
192 let sender = self.new_work_tx.clone();
193 self.handle.spawn(async move {
194 if let Err(e) = future.await {
195 error!("rpc call submit_block error: {:?}", e);
196 sender.send(Works::FailSubmit(parent)).unwrap()
197 }
198 });
199 Ok(())
200 }
201 }
202
203 pub fn spawn_background(self) {
205 let client = self.clone();
206 if let Some(addr) = self.config.listen {
207 ckb_logger::info!("listen notify mode : {}", addr);
208 ckb_logger::info!(
209 r#"
210Please note that ckb-miner runs in notify mode. \
211You should configure the corresponding information in CKB block assembler, \
212for example:
213
214[block_assembler]
215...
216notify = ["http://{}"]
217
218Otherwise ckb-miner will malfunction and stop submitting valid blocks after a certain period.
219"#,
220 addr
221 );
222 self.handle.spawn(async move {
223 client.listen_block_template_notify(addr).await;
224 });
225 self.blocking_fetch_block_template();
226 } else {
227 ckb_logger::info!("loop poll mode: interval {}ms", self.config.poll_interval);
228 self.handle.spawn(async move {
229 client.poll_block_template().await;
230 });
231 }
232 }
233
234 async fn listen_block_template_notify(&self, addr: SocketAddr) {
235 let listener = TcpListener::bind(addr).await.unwrap();
236 let server = auto::Builder::new(TokioExecutor::new());
237 let graceful = GracefulShutdown::new();
238 let stop_rx: CancellationToken = new_tokio_exit_rx();
239
240 loop {
241 let client = self.clone();
242 let handle = service_fn(move |req| handle(client.clone(), req));
243 tokio::select! {
244 conn = listener.accept() => {
245 let (stream, _) = match conn {
246 Ok(conn) => conn,
247 Err(e) => {
248 info!("accept error: {}", e);
249 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
250 continue;
251 }
252 };
253 let stream = hyper_util::rt::TokioIo::new(Box::pin(stream));
254 let conn = server.serve_connection_with_upgrades(stream, handle);
255
256 let conn = graceful.watch(conn.into_owned());
257 tokio::spawn(async move {
258 if let Err(err) = conn.await {
259 info!("connection error: {}", err);
260 }
261 });
262 },
263 _ = stop_rx.cancelled() => {
264 info!("Miner client received exit signal. Exit now");
265 break;
266 }
267 }
268 }
269 drop(listener);
270 graceful.shutdown().await;
271 }
272
273 async fn poll_block_template(&self) {
274 let poll_interval = time::Duration::from_millis(self.config.poll_interval);
275 let mut interval = tokio::time::interval(poll_interval);
276 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
277 let stop_rx: CancellationToken = new_tokio_exit_rx();
278 loop {
279 tokio::select! {
280 _ = interval.tick() => {
281 debug!("poll block template...");
282 self.fetch_block_template().await;
283 }
284 _ = stop_rx.cancelled() => {
285 info!("Miner client pool_block_template received exit signal, exit now");
286 break
287 },
288 else => break,
289 }
290 }
291 }
292
293 fn update_block_template(&self, block_template: BlockTemplate) {
294 let work_id = block_template.work_id.into();
295 let updated = |id| {
296 if id != work_id || id == 0 {
297 Some(work_id)
298 } else {
299 None
300 }
301 };
302 if self
303 .current_work_id
304 .fetch_update(Ordering::SeqCst, Ordering::SeqCst, updated)
305 .is_ok()
306 {
307 let work: Work = block_template.into();
308 if let Err(e) = self.new_work_tx.send(Works::New(work)) {
309 error!("notify_new_block error: {:?}", e);
310 }
311 }
312 }
313
314 pub(crate) fn blocking_fetch_block_template(&self) {
315 self.handle.block_on(self.fetch_block_template())
316 }
317
318 async fn fetch_block_template(&self) {
319 match self.get_block_template().await {
320 Ok(block_template) => {
321 self.update_block_template(block_template);
322 }
323 Err(ref err) => {
324 let is_method_not_found = if let RpcError::Fail(RpcFail { code, .. }) = err {
325 *code == RpcFailCode::MethodNotFound
326 } else {
327 false
328 };
329 if is_method_not_found {
330 error!(
331 "RPC Method Not Found: \
332 Please perform the following checks: \
333 1. Ensure that the CKB server has enabled the Miner API module; \
334 2. Verify that the CKB server has set the `block_assembler` correctly; \
335 3. Confirm that the RPC URL for CKB miner is correct.",
336 );
337 } else {
338 error!("rpc call get_block_template error: {:?}", err);
339 }
340 }
341 }
342 }
343
344 async fn get_block_template(&self) -> Result<BlockTemplate, RpcError> {
345 let method = "get_block_template".to_owned();
346 let params = vec![];
347
348 self.rpc
349 .clone()
350 .request(method, params)
351 .and_then(parse_response)
352 .await
353 }
354}
355
356type Error = Box<dyn std::error::Error + Send + Sync>;
357
358async fn handle(
359 client: Client,
360 req: Request<hyper::body::Incoming>,
361) -> Result<Response<Empty<Bytes>>, Error> {
362 let body = BodyExt::collect(req).await?.aggregate();
363
364 if let Ok(template) = serde_json::from_reader(body.reader()) {
365 client.update_block_template(template);
366 }
367
368 Ok(Response::new(Empty::new()))
369}
370
371async fn parse_response<T: serde::de::DeserializeOwned>(output: Output) -> Result<T, RpcError> {
372 match output {
373 Output::Success(success) => {
374 serde_json::from_value::<T>(success.result).map_err(RpcError::Json)
375 }
376 Output::Failure(failure) => Err(RpcError::Fail(failure.error)),
377 }
378}
379
380fn parse_authorization(url: &Uri) -> Option<HeaderValue> {
381 let a: Vec<&str> = url.authority()?.as_str().split('@').collect();
382 if a.len() >= 2 {
383 if a[0].is_empty() {
384 return None;
385 }
386 let mut encoded = "Basic ".to_string();
387 base64::prelude::BASE64_STANDARD.encode_string(a[0], &mut encoded);
388 let mut header = HeaderValue::from_str(&encoded).unwrap();
389 header.set_sensitive(true);
390 Some(header)
391 } else {
392 None
393 }
394}