1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
use std::io;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Duration;
use actix_web::{Error, HttpResponse};
use actix_web::web::{Data, Payload};
use async_stream::__private::AsyncStream;
use bytes::Bytes;
use log::{debug, error};
use tokio::io::Interest;
use tokio::net::TcpStream;
use async_stream::stream;
use futures_core::Stream;
use futures_util::task::noop_waker_ref;
use crate::config::anttp_config::AntTpConfig;
pub async fn forward(
ant_tp_config_data: Data<AntTpConfig>,
mut client_stream: Payload,
) -> Result<HttpResponse, Error> {
let client_writer: AsyncStream<Result<Bytes, Error>, _> = stream! {
// Connect to a peer
let server_stream = TcpStream::connect(ant_tp_config_data.https_listen_address).await?;
loop {
let ready = server_stream.ready(Interest::READABLE | Interest::WRITABLE).await?;
if ready.is_readable() {
let mut data = vec![0; 1024 * 8]; // todo: tune
// Try to read data, this may still fail with `WouldBlock`
// if the readiness event is a false positive.
match server_stream.try_read(&mut data) {
Ok(n) => {
if n > 0 {
debug!("read {} bytes from server", n);
let bytes = Bytes::copy_from_slice(&data[..n]);
yield Ok(bytes);
continue;
}
}
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
debug!("read: WouldBlock");
continue;
}
Err(e) => {
error!("error reading bytes: {}", e);
break;
}
}
}
if ready.is_writable() {
// Try to write data, this may still fail with `WouldBlock`
// if the readiness event is a false positive.
let pin_client_stream = Pin::new(&mut client_stream);
match pin_client_stream.poll_next(&mut Context::from_waker(noop_waker_ref())) {
Poll::Ready(Some(chunk_result)) => {
match chunk_result {
Ok(bytes) => {
match server_stream.try_write(bytes.iter().as_slice()) {
Ok(n) => {
debug!("write {} bytes to server", n);
continue;
}
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
debug!("write: WouldBlock");
continue;
}
Err(e) => {
error!("error writing bytes: {}", e);
break;
}
}
}
Err(e) => {
error!("error polling payload: {}", e);
break;
}
}
}
Poll::Ready(None) => tokio::task::yield_now().await, // Stream exhausted
Poll::Pending => tokio::task::yield_now().await, // todo: register a Waker?
}
}
// no reads or writes, so yield and sleep for a bit
tokio::task::yield_now().await;
tokio::time::sleep(Duration::from_millis(5)).await; // todo: tune
}
};
Ok(HttpResponse::Ok().streaming(client_writer))
}