Function rserver::read_stream
source · Expand description
Read data from a TcpStream. Data is returned as Vec
Examples found in repository?
src/server.rs (line 49)
45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154
async fn handle_client(
client: &mut TcpStream,
config: Config,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let request_buffer = read_stream(client).await?;
println!(
"******************* Request Received *****************\n{}\n",
String::from_utf8_lossy(&request_buffer).trim()
);
let request = Request::from(request_buffer);
// println!("request: {:?}", request);
connect_and_handle_client_request(client, request, &config).await?;
Ok(())
}
/// Read data from a TcpStream. Data is returned as Vec<u8> (bytes).
pub async fn read_stream(
stream: &mut TcpStream,
) -> Result<Vec<u8>, Box<dyn std::error::Error + Send + Sync>> {
let mut buffer: Vec<u8> = Vec::new();
let ready = stream.ready(Interest::READABLE).await?;
// println!("Stream: {:?}", stream);
// println!("Ready to read: {:?}", ready);
if ready.is_readable() {
let buffer_size: usize = 1024;
loop {
let mut fixed_buffer = vec![0; buffer_size];
match stream.read(&mut fixed_buffer).await {
Ok(n) if n == 0 => break,
Ok(n) if n < buffer_size => {
buffer.append(&mut fixed_buffer[..n].to_vec());
break;
}
Ok(_) => {
buffer.append(&mut fixed_buffer);
}
Err(e) => {
println!("Error in reading stram data: {}", e);
break;
}
}
}
}
Ok(buffer)
}
/// Connect to remote address or proxy & handle client request.
async fn connect_and_handle_client_request(
client: &mut TcpStream,
request: Request,
config: &Config,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
println!("Handling client request....");
let address = if config.enable_proxy {
format!("{}:{}", config.proxy_host, config.proxy_port)
} else {
format!("{}:{}", request.host, request.port)
};
println!("Connecting to the remote host ({})", address);
let mut remote = TcpStream::connect(address.clone()).await?;
println!("Connected to the remote host ({})", address);
match request.method.as_str() {
"CONNECT" => handle_connect(client, request, &mut remote).await?,
_ => handle_default(client, request, &mut remote).await?,
}
// remote.flush().await?;
// client.flush().await?;
// remote.shutdown().await?;
// client.shutdown().await?;
println!("******** Complete Response sent to the client ********\n");
Ok(())
}
/// Handle requests which are not CONNECT, i.e. GET, POST, etc.
async fn handle_default(
client: &mut TcpStream,
request: Request,
remote: &mut TcpStream,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
println!("Handling non-HTTPS request....");
match remote.write(&request.raw_data).await {
Ok(n) => println!(
"Wrote {} bytes and data to remote: {:?}",
n,
String::from_utf8_lossy(&request.raw_data)
),
Err(e) => println!("Write error in remote: {}", e),
}
match read_stream(remote).await {
Ok(response) => {
println!(
"Read {} bytes and data from server: {:?}",
response.len(),
String::from_utf8_lossy(&response)
);
match client.write(&response).await {
Ok(n) => println!(
"Wrote {} bytes and data to client: {:?}",
n,
String::from_utf8_lossy(&response)
),
Err(e) => println!("Write error in client: {}", e),
}
}
Err(e) => println!("Write error in client: {}", e),
}
Ok(())
}