proto_tower_util/
test_io_service.rs1use std::future::Future;
2use std::pin::Pin;
3use std::sync::atomic::{AtomicBool, Ordering};
4use std::sync::Arc;
5use std::task::{Context, Poll};
6use tokio::io::{AsyncReadExt, AsyncWriteExt};
7use tokio::sync::Mutex;
8use tower::Service;
9
10#[derive(Debug, Clone)]
12pub struct TestIoService {
13 pub read: Arc<Mutex<Vec<u8>>>,
14 pub write: Arc<Vec<u8>>,
15}
16
17impl TestIoService {
18 pub fn new(write: Vec<u8>) -> Self {
19 TestIoService {
20 read: Arc::new(Mutex::new(Vec::new())),
21 write: Arc::new(write),
22 }
23 }
24}
25
26impl<Reader, Writer> Service<(Reader, Writer)> for TestIoService
27where
28 Reader: AsyncReadExt + Send + Unpin + 'static,
29 Writer: AsyncWriteExt + Send + Unpin + 'static,
30{
31 type Response = ();
32 type Error = ();
33 type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
34
35 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
36 Poll::Ready(Ok(()))
37 }
38
39 fn call(&mut self, (mut read, mut write): (Reader, Writer)) -> Self::Future {
40 let read_data = self.read.clone();
41 let write_data = self.write.clone();
42 Box::pin(async move {
43 let mut buffer = vec![0; 1024];
44 let sent_response = AtomicBool::new(false);
45 while let Ok(sz) = read.read(&mut buffer).await {
46 eprintln!("Read {} bytes", sz);
47 if sz == 0 {
48 break;
49 }
50 let mut read_lock = read_data.lock().await;
51 read_lock.extend_from_slice(&buffer[..sz]);
52 if !sent_response.load(Ordering::SeqCst) {
53 let l = write_data.len();
54 eprintln!("Writing {} bytes", l);
55 write.write_all(&write_data).await.unwrap();
56 sent_response.store(true, Ordering::SeqCst);
57 }
58 }
59 Ok(())
60 })
61 }
62}