proto_tower_util/
test_io_service.rs

1use std::future::Future;
2use std::pin::Pin;
3use std::sync::atomic::{AtomicBool, Ordering};
4use std::sync::Arc;
5use std::task::{Context, Poll};
6use tokio::io::{AsyncReadExt, AsyncWriteExt};
7use tokio::sync::Mutex;
8use tower::Service;
9
10/// A service you can conveniently use to validate layers that interact with AsyncReadExt and AsyncWriteExt
11#[derive(Debug, Clone)]
12pub struct TestIoService {
13    pub read: Arc<Mutex<Vec<u8>>>,
14    pub write: Arc<Vec<u8>>,
15}
16
17impl TestIoService {
18    pub fn new(write: Vec<u8>) -> Self {
19        TestIoService {
20            read: Arc::new(Mutex::new(Vec::new())),
21            write: Arc::new(write),
22        }
23    }
24}
25
26impl<Reader, Writer> Service<(Reader, Writer)> for TestIoService
27where
28    Reader: AsyncReadExt + Send + Unpin + 'static,
29    Writer: AsyncWriteExt + Send + Unpin + 'static,
30{
31    type Response = ();
32    type Error = ();
33    type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
34
35    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
36        Poll::Ready(Ok(()))
37    }
38
39    fn call(&mut self, (mut read, mut write): (Reader, Writer)) -> Self::Future {
40        let read_data = self.read.clone();
41        let write_data = self.write.clone();
42        Box::pin(async move {
43            let mut buffer = vec![0; 1024];
44            let sent_response = AtomicBool::new(false);
45            while let Ok(sz) = read.read(&mut buffer).await {
46                eprintln!("Read {} bytes", sz);
47                if sz == 0 {
48                    break;
49                }
50                let mut read_lock = read_data.lock().await;
51                read_lock.extend_from_slice(&buffer[..sz]);
52                if !sent_response.load(Ordering::SeqCst) {
53                    let l = write_data.len();
54                    eprintln!("Writing {} bytes", l);
55                    write.write_all(&write_data).await.unwrap();
56                    sent_response.store(true, Ordering::SeqCst);
57                }
58            }
59            Ok(())
60        })
61    }
62}