soil_client/import/queue/
buffered_link.rs1use super::{JustificationImportResult, Link, RuntimeOrigin};
30use crate::utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
31use futures::prelude::*;
32use std::{
33 pin::Pin,
34 task::{Context, Poll},
35};
36use subsoil::runtime::traits::{Block as BlockT, NumberFor};
37
38use super::BlockImportResult;
39
40pub fn buffered_link<B: BlockT>(
44 queue_size_warning: usize,
45) -> (BufferedLinkSender<B>, BufferedLinkReceiver<B>) {
46 let (tx, rx) = tracing_unbounded("mpsc_buffered_link", queue_size_warning);
47 let tx = BufferedLinkSender { tx };
48 let rx = BufferedLinkReceiver { rx: rx.fuse() };
49 (tx, rx)
50}
51
52pub struct BufferedLinkSender<B: BlockT> {
54 tx: TracingUnboundedSender<BlockImportWorkerMsg<B>>,
55}
56
57impl<B: BlockT> BufferedLinkSender<B> {
58 pub fn is_closed(&self) -> bool {
62 self.tx.is_closed()
63 }
64}
65
66impl<B: BlockT> Clone for BufferedLinkSender<B> {
67 fn clone(&self) -> Self {
68 BufferedLinkSender { tx: self.tx.clone() }
69 }
70}
71
72pub enum BlockImportWorkerMsg<B: BlockT> {
74 BlocksProcessed(usize, usize, Vec<(BlockImportResult<B>, B::Hash)>),
75 JustificationImported(RuntimeOrigin, B::Hash, NumberFor<B>, JustificationImportResult),
76 RequestJustification(B::Hash, NumberFor<B>),
77}
78
79impl<B: BlockT> Link<B> for BufferedLinkSender<B> {
80 fn blocks_processed(
81 &self,
82 imported: usize,
83 count: usize,
84 results: Vec<(BlockImportResult<B>, B::Hash)>,
85 ) {
86 let _ = self
87 .tx
88 .unbounded_send(BlockImportWorkerMsg::BlocksProcessed(imported, count, results));
89 }
90
91 fn justification_imported(
92 &self,
93 who: RuntimeOrigin,
94 hash: &B::Hash,
95 number: NumberFor<B>,
96 import_result: JustificationImportResult,
97 ) {
98 let msg = BlockImportWorkerMsg::JustificationImported(who, *hash, number, import_result);
99 let _ = self.tx.unbounded_send(msg);
100 }
101
102 fn request_justification(&self, hash: &B::Hash, number: NumberFor<B>) {
103 let _ = self
104 .tx
105 .unbounded_send(BlockImportWorkerMsg::RequestJustification(*hash, number));
106 }
107}
108
109pub struct BufferedLinkReceiver<B: BlockT> {
111 rx: stream::Fuse<TracingUnboundedReceiver<BlockImportWorkerMsg<B>>>,
112}
113
114impl<B: BlockT> BufferedLinkReceiver<B> {
115 pub fn send_actions(&mut self, msg: BlockImportWorkerMsg<B>, link: &dyn Link<B>) {
117 match msg {
118 BlockImportWorkerMsg::BlocksProcessed(imported, count, results) => {
119 link.blocks_processed(imported, count, results)
120 },
121 BlockImportWorkerMsg::JustificationImported(who, hash, number, import_result) => {
122 link.justification_imported(who, &hash, number, import_result)
123 },
124 BlockImportWorkerMsg::RequestJustification(hash, number) => {
125 link.request_justification(&hash, number)
126 },
127 }
128 }
129
130 pub fn poll_actions(&mut self, cx: &mut Context, link: &dyn Link<B>) -> Result<(), ()> {
139 loop {
140 let msg = match Stream::poll_next(Pin::new(&mut self.rx), cx) {
141 Poll::Ready(Some(msg)) => msg,
142 Poll::Ready(None) => break Err(()),
143 Poll::Pending => break Ok(()),
144 };
145
146 self.send_actions(msg, link);
147 }
148 }
149
150 pub async fn next_action(&mut self, link: &dyn Link<B>) -> Result<(), ()> {
152 if let Some(msg) = self.rx.next().await {
153 self.send_actions(msg, link);
154 return Ok(());
155 }
156 Err(())
157 }
158
159 pub fn close(&mut self) -> bool {
161 self.rx.get_mut().close()
162 }
163}
164
165#[cfg(test)]
166mod tests {
167 use soil_test::primitives::Block;
168
169 #[test]
170 fn is_closed() {
171 let (tx, rx) = super::buffered_link::<Block>(1);
172 assert!(!tx.is_closed());
173 drop(rx);
174 assert!(tx.is_closed());
175 }
176}